Эх сурвалжийг харах

remove xhr from socket
add enum for server's side events

afrokick 6 жил өмнө
parent
commit
9c5da55086

+ 4 - 3
lib/dataconnection.ts

@@ -4,7 +4,8 @@ import { Reliable } from "reliable";
 import {
   ConnectionType,
   ConnectionEventType,
-  SerializationType
+  SerializationType,
+  ServerMessageType
 } from "./enums";
 import { Peer } from "./peer";
 import { BaseConnection } from "./baseconnection";
@@ -277,13 +278,13 @@ export class DataConnection extends BaseConnection {
     const payload = message.payload;
 
     switch (message.type) {
-      case "ANSWER":
+      case ServerMessageType.Answer:
         this._peerBrowser = payload.browser;
 
         // Forward to negotiator
         Negotiator.handleSDP(message.type, this, payload.sdp);
         break;
-      case "CANDIDATE":
+      case ServerMessageType.Candidate:
         Negotiator.handleCandidate(this, payload.candidate);
         break;
       default:

+ 14 - 0
lib/enums.ts

@@ -47,3 +47,17 @@ export enum SocketEventType {
   Error = "error",
   Close = "close"
 }
+
+export enum ServerMessageType {
+  Heartbeat = "HEARTBEAT",
+  Candidate = "CANDIDATE",
+  Offer = "OFFER",
+  Answer = "ANSWER",
+  Open = "OPEN", // The connection to the server is open.
+  Error = "ERROR", // Server error.
+  IdTaken = "ID-TAKEN", // The selected ID is taken.
+  InvalidKey = "INVALID-KEY", // The given API key cannot be found.
+  Leave = "LEAVE", // Another peer has closed its connection to this peer.
+  Expire = "EXPIRE" // The offer sent to a peer has expired without response.
+
+}

+ 3 - 3
lib/mediaconnection.ts

@@ -1,6 +1,6 @@
 import { util } from "./util";
 import Negotiator from "./negotiator";
-import { ConnectionType, ConnectionEventType } from "./enums";
+import { ConnectionType, ConnectionEventType, ServerMessageType } from "./enums";
 import { Peer } from "./peer";
 import { BaseConnection } from "./baseconnection";
 import { ServerMessage } from "./servermessage";
@@ -46,12 +46,12 @@ export class MediaConnection extends BaseConnection {
     const payload = message.payload;
 
     switch (message.type) {
-      case "ANSWER":
+      case ServerMessageType.Answer:
         // Forward to negotiator
         Negotiator.handleSDP(type, this, payload.sdp);
         this._open = true;
         break;
-      case "CANDIDATE":
+      case ServerMessageType.Candidate:
         Negotiator.handleCandidate(this, payload.candidate);
         break;
       default:

+ 4 - 5
lib/negotiator.ts

@@ -7,9 +7,8 @@ import {
 import * as Reliable from "reliable";
 import { MediaConnection } from "./mediaconnection";
 import { DataConnection } from "./dataconnection";
-import { ConnectionType, PeerErrorType, ConnectionEventType } from "./enums";
+import { ConnectionType, PeerErrorType, ConnectionEventType, ServerMessageType } from "./enums";
 import { BaseConnection } from "./baseconnection";
-import { utils } from "mocha";
 
 /**
  * Manages all negotiations between Peers.
@@ -133,7 +132,7 @@ class Negotiator {
       if (evt.candidate) {
         util.log("Received ICE candidates for:", peerId);
         provider.socket.send({
-          type: "CANDIDATE",
+          type: ServerMessageType.Candidate,
           payload: {
             candidate: evt.candidate,
             type: connectionType,
@@ -273,7 +272,7 @@ class Negotiator {
         }
 
         connection.provider.socket.send({
-          type: "OFFER",
+          type: ServerMessageType.Offer,
           payload,
           dst: connection.peer
         });
@@ -313,7 +312,7 @@ class Negotiator {
         util.log(`Set localDescription:`, answer, `for:${connection.peer}`);
 
         connection.provider.socket.send({
-          type: "ANSWER",
+          type: ServerMessageType.Answer,
           payload: {
             sdp: answer,
             type: connection.type,

+ 9 - 13
lib/peer.ts

@@ -8,7 +8,8 @@ import {
   PeerErrorType,
   PeerEventType,
   SocketEventType,
-  SerializationType
+  SerializationType,
+  ServerMessageType
 } from "./enums";
 import { BaseConnection } from "./baseconnection";
 import { ServerMessage } from "./servermessage";
@@ -19,7 +20,6 @@ class PeerOptions implements PeerJSOption {
   debug: number; // 1: Errors, 2: Warnings, 3: All logs
   host: string;
   port: number;
-  wsport: number;
   path: string;
   key: string;
   token: string;
@@ -163,7 +163,6 @@ export class Peer extends EventEmitter {
       this._options.port,
       this._options.path,
       this._options.key,
-      this._options.wsport
     );
 
     this.socket.on(SocketEventType.Message, data => {
@@ -206,36 +205,33 @@ export class Peer extends EventEmitter {
     const peerId = message.src;
 
     switch (type) {
-      case "OPEN": // The connection to the server is open.
+      case ServerMessageType.Open: // The connection to the server is open.
         this.emit(PeerEventType.Open, this.id);
         this._open = true;
         break;
-      case "ERROR": // Server error.
+      case ServerMessageType.Error: // Server error.
         this._abort(PeerErrorType.ServerError, payload.msg);
         break;
-      case "ID-TAKEN": // The selected ID is taken.
+      case ServerMessageType.IdTaken: // The selected ID is taken.
         this._abort(PeerErrorType.UnavailableID, `ID "${this.id}" is taken`);
         break;
-      case "INVALID-KEY": // The given API key cannot be found.
+      case ServerMessageType.InvalidKey: // The given API key cannot be found.
         this._abort(
           PeerErrorType.InvalidKey,
           `API KEY "${this._options.key}" is invalid`
         );
         break;
-
-      //
-      case "LEAVE": // Another peer has closed its connection to this peer.
+      case ServerMessageType.Leave: // Another peer has closed its connection to this peer.
         util.log("Received leave message from", peerId);
         this._cleanupPeer(peerId);
         break;
-
-      case "EXPIRE": // The offer sent to a peer has expired without response.
+      case ServerMessageType.Expire: // The offer sent to a peer has expired without response.
         this.emitError(
           PeerErrorType.PeerUnavailable,
           "Could not connect to peer " + peerId
         );
         break;
-      case "OFFER": {
+      case ServerMessageType.Offer: {
         // we should consider switching this to CALL/CONNECT, but this is the least breaking option.
         const connectionId = payload.connectionId;
         let connection = this.getConnection(peerId, connectionId);

+ 3 - 1
lib/servermessage.ts

@@ -1,5 +1,7 @@
+import { ServerMessageType } from "./enums";
+
 export class ServerMessage {
-  type: string;
+  type: ServerMessageType;
   payload: any;
   src: string;
 }

+ 18 - 193
lib/socket.ts

@@ -1,105 +1,20 @@
 import { util } from "./util";
 import { EventEmitter } from "eventemitter3";
-import { SocketEventType } from "./enums";
-
-class HttpRequest {
-  index = 1;
-  readonly buffer: number[] = [];
-  previousRequest: HttpRequest;
-  private _xmlHttpRequest = new XMLHttpRequest();
-
-  private _onError = sender => { };
-  private _onSuccess = sender => { };
-
-  set onError(handler) {
-    this._onError = handler;
-  }
-
-  set onSuccess(handler) {
-    this._onSuccess = handler;
-  }
-
-  constructor(readonly streamIndex: number, private readonly _httpUrl: string) {
-    this._xmlHttpRequest.onerror = () => {
-      this._onError(self);
-    };
-
-    this._xmlHttpRequest.onreadystatechange = () => {
-      if (this.needsClearPreviousRequest()) {
-        this.clearPreviousRequest();
-      } else if (this.isSuccess()) {
-        this._onSuccess(self);
-      }
-    };
-  }
-
-  send(): void {
-    this._xmlHttpRequest.open(
-      "post",
-      this._httpUrl + "/id?i=" + this.streamIndex,
-      true
-    );
-    this._xmlHttpRequest.send(null);
-  }
-
-  abort(): void {
-    this._xmlHttpRequest.abort();
-    this._xmlHttpRequest = null;
-  }
-
-  isSuccess(): boolean {
-    return (
-      this._xmlHttpRequest.readyState > 2 &&
-      this._xmlHttpRequest.status === 200 &&
-      !!this._xmlHttpRequest.responseText
-    );
-  }
-
-  needsClearPreviousRequest(): boolean {
-    return this._xmlHttpRequest.readyState === 2 && !!this.previousRequest;
-  }
-
-  clearPreviousRequest(): void {
-    if (!this.previousRequest) return;
-
-    this.previousRequest.abort();
-    this.previousRequest = null;
-  }
-
-  getMessages(): string[] {
-    return this._xmlHttpRequest.responseText.split("\n");
-  }
-
-  hasBufferedIndices(): boolean {
-    return this.buffer.length > 0;
-  }
-
-  popBufferedIndex(): number {
-    return this.buffer.shift();
-  }
-
-  pushIndexToBuffer(index: number) {
-    this.buffer.push(index);
-  }
-}
+import { SocketEventType, ServerMessageType } from "./enums";
 
 /**
- * An abstraction on top of WebSockets and XHR streaming to provide fastest
+ * An abstraction on top of WebSockets to provide fastest
  * possible connection for peers.
  */
 export class Socket extends EventEmitter {
-  private readonly HTTP_TIMEOUT = 25000;//ms
   private readonly WEB_SOCKET_PING_INTERVAL = 20000;//ms
 
   private _disconnected = false;
   private _id: string;
   private _messagesQueue: Array<any> = [];
-  private _httpUrl: string;
   private _wsUrl: string;
   private _socket: WebSocket;
   private _wsPingTimer: any;
-  private _httpRequest: HttpRequest;
-  private _timeout: any;
 
   constructor(
     secure: any,
@@ -107,25 +22,20 @@ export class Socket extends EventEmitter {
     port: number,
     path: string,
     key: string,
-    wsport = port
   ) {
     super();
 
-    const httpProtocol = secure ? "https://" : "http://";
     const wsProtocol = secure ? "wss://" : "ws://";
 
-    this._httpUrl = httpProtocol + host + ":" + port + path + key;
-    this._wsUrl = wsProtocol + host + ":" + wsport + path + "peerjs?key=" + key;
+    this._wsUrl = wsProtocol + host + ":" + port + path + "peerjs?key=" + key;
   }
 
   /** Check in with ID or get one from server. */
   start(id: string, token: string): void {
     this._id = id;
 
-    this._httpUrl += "/" + id + "/" + token;
     this._wsUrl += "&id=" + id + "&token=" + token;
 
-    this._startXhrStream();
     this._startWebSocket();
   }
 
@@ -146,6 +56,7 @@ export class Socket extends EventEmitter {
         util.log("Invalid server message", event.data);
         return;
       }
+
       this.emit(SocketEventType.Message, data);
     };
 
@@ -153,49 +64,24 @@ export class Socket extends EventEmitter {
       util.log("Socket closed.", event);;
 
       this._disconnected = true;
+      clearTimeout(this._wsPingTimer);
+
       this.emit(SocketEventType.Disconnected);
     };
 
     // Take care of the queue of connections if necessary and make sure Peer knows
     // socket is open.
     this._socket.onopen = () => {
-      if (this._timeout) {
-        clearTimeout(this._timeout);
-        setTimeout(() => {
-          this._httpRequest.abort();
-          this._httpRequest = null;
-        }, 5000);
-      }
-
       this._sendQueuedMessages();
+
       util.log("Socket open");
 
-      this._wsPingTimer = setTimeout(() => { this._sendHeartbeat() }, this.WEB_SOCKET_PING_INTERVAL);
+      this._scheduleHeartbeat();
     };
   }
 
-  /** Start XHR streaming. */
-  private _startXhrStream(streamIndex: number = 0) {
-    const newRequest = new HttpRequest(streamIndex, this._httpUrl);
-    this._httpRequest = newRequest;
-
-    newRequest.onError = () => {
-      // If we get an error, likely something went wrong.
-      // Stop streaming.
-      clearTimeout(this._timeout);
-      this.emit(SocketEventType.Disconnected);
-    };
-
-    newRequest.onSuccess = () => {
-      this._handleStream(newRequest);
-    };
-
-    try {
-      newRequest.send();
-      this._setHTTPTimeout();
-    } catch (e) {
-      util.log("XMLHttpRequest not available; defaulting to WebSockets");
-    }
+  private _scheduleHeartbeat(): void {
+    this._wsPingTimer = setTimeout(() => { this._sendHeartbeat() }, this.WEB_SOCKET_PING_INTERVAL);
   }
 
   private _sendHeartbeat(): void {
@@ -204,67 +90,11 @@ export class Socket extends EventEmitter {
       return;
     }
 
-    const data = { type: 'HEARTBEAT' };
-    const message = JSON.stringify(data);
-    this._socket.send(message);
-
-    this._wsPingTimer = setTimeout(() => { this._sendHeartbeat() }, this.WEB_SOCKET_PING_INTERVAL);
-  }
-
-  /** Handles onreadystatechange response as a stream. */
-  private _handleStream(httpRequest: HttpRequest) {
-    // 3 and 4 are loading/done state. All others are not relevant.
-    const messages = httpRequest.getMessages();
-
-    // Check to see if anything needs to be processed on buffer.
-
-    while (httpRequest.hasBufferedIndices()) {
-      const index = httpRequest.popBufferedIndex();
-      let bufferedMessage = messages[index];
+    const message = JSON.stringify({ type: ServerMessageType.Heartbeat });
 
-      try {
-        bufferedMessage = JSON.parse(bufferedMessage);
-      } catch (e) {
-        //TODO should we need to put it back?
-        httpRequest.buffer.unshift(index);
-        break;
-      }
-
-      this.emit(SocketEventType.Message, bufferedMessage);
-    }
-
-    let message = messages[httpRequest.index];
-
-    if (message) {
-      httpRequest.index += 1;
-      // Buffering--this message is incomplete and we'll get to it next time.
-      // This checks if the httpResponse ended in a `\n`, in which case the last
-      // element of messages should be the empty string.
-      if (httpRequest.index === messages.length) {
-        httpRequest.pushIndexToBuffer(httpRequest.index - 1);
-      } else {
-        try {
-          message = JSON.parse(message);
-        } catch (e) {
-          util.log("Invalid server message", message);
-          return;
-        }
-        this.emit(SocketEventType.Message, message);
-      }
-    }
-  }
+    this._socket.send(message);
 
-  private _setHTTPTimeout() {
-    this._timeout = setTimeout(() => {
-      if (!this._wsOpen()) {
-        const oldHttp = this._httpRequest;
-        this._startXhrStream(oldHttp.streamIndex + 1);
-        this._httpRequest.previousRequest = oldHttp;
-      } else {
-        this._httpRequest.abort();
-        this._httpRequest = null;
-      }
-    }, this.HTTP_TIMEOUT);
+    this._scheduleHeartbeat();
   }
 
   /** Is the websocket currently open? */
@@ -274,7 +104,6 @@ export class Socket extends EventEmitter {
 
   /** Send queued messages. */
   private _sendQueuedMessages(): void {
-    //TODO is it ok?
     //Create copy of queue and clear it,
     //because send method push the message back to queue if smth will go wrong
     const copiedQueue = [...this._messagesQueue];
@@ -303,17 +132,13 @@ export class Socket extends EventEmitter {
       return;
     }
 
+    if (!this._wsOpen()) {
+      return;
+    }
+
     const message = JSON.stringify(data);
 
-    if (this._wsOpen()) {
-      this._socket.send(message);
-    } else {
-      const http = new XMLHttpRequest();
-      const url = this._httpUrl + "/" + data.type.toLowerCase();
-      http.open("post", url, true);
-      http.setRequestHeader("Content-Type", "application/json");
-      http.send(message);
-    }
+    this._socket.send(message);
   }
 
   close(): void {