123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- import { util } from "./util";
- import { EventEmitter } from "eventemitter3";
- import { SocketEventType } from "./enums";
- class HttpRequest {
- index = 1;
- readonly buffer: number[] = [];
- previousRequest: HttpRequest;
- private _xmlHttpRequest = new XMLHttpRequest();
- private _onError = sender => {};
- private _onSuccess = sender => {};
- set onError(handler) {
- this._onError = handler;
- }
- set onSuccess(handler) {
- this._onSuccess = handler;
- }
- constructor(readonly streamIndex: number, private readonly _httpUrl: string) {
- const self = this;
- this._xmlHttpRequest.onerror = () => {
- self._onError(self);
- };
- this._xmlHttpRequest.onreadystatechange = () => {
- if (self.needsClearPreviousRequest()) {
- self.clearPreviousRequest();
- } else if (self.isSuccess()) {
- self._onSuccess(self);
- }
- };
- }
- send(): void {
- this._xmlHttpRequest.open(
- "post",
- this._httpUrl + "/id?i=" + this.streamIndex,
- true
- );
- this._xmlHttpRequest.send(null);
- }
- abort(): void {
- this._xmlHttpRequest.abort();
- this._xmlHttpRequest = null;
- }
- isSuccess(): boolean {
- return (
- this._xmlHttpRequest.readyState > 2 &&
- this._xmlHttpRequest.status === 200 &&
- !!this._xmlHttpRequest.responseText
- );
- }
- needsClearPreviousRequest(): boolean {
- return this._xmlHttpRequest.readyState === 2 && !!this.previousRequest;
- }
- clearPreviousRequest(): void {
- if (!this.previousRequest) return;
- this.previousRequest.abort();
- this.previousRequest = null;
- }
- getMessages(): string[] {
- return this._xmlHttpRequest.responseText.split("\n");
- }
- hasBufferedIndices(): boolean {
- return this.buffer.length > 0;
- }
- popBufferedIndex(): number {
- return this.buffer.shift();
- }
- pushIndexToBuffer(index: number) {
- this.buffer.push(index);
- }
- }
- /**
- * An abstraction on top of WebSockets and XHR streaming to provide fastest
- * possible connection for peers.
- */
- export class Socket extends EventEmitter {
- private readonly HTTP_TIMEOUT = 25000;
- private _disconnected = false;
- private _id: string;
- private _messagesQueue: Array<any> = [];
- private _httpUrl: string;
- private _wsUrl: string;
- private _socket: WebSocket;
- private _httpRequest: HttpRequest;
- private _timeout: any;
- constructor(
- secure: any,
- host: string,
- port: number,
- path: string,
- key: string,
- wsport = port
- ) {
- super();
- const httpProtocol = secure ? "https://" : "http://";
- const wsProtocol = secure ? "wss://" : "ws://";
- this._httpUrl = httpProtocol + host + ":" + port + path + key;
- this._wsUrl = wsProtocol + host + ":" + wsport + path + "peerjs?key=" + key;
- }
- /** Check in with ID or get one from server. */
- start(id: string, token: string): void {
- this._id = id;
- this._httpUrl += "/" + id + "/" + token;
- this._wsUrl += "&id=" + id + "&token=" + token;
- this._startXhrStream();
- this._startWebSocket();
- }
- /** Start up websocket communications. */
- private _startWebSocket(): void {
- if (this._socket) {
- return;
- }
- this._socket = new WebSocket(this._wsUrl);
- const self = this;
- this._socket.onmessage = function(event) {
- let data;
- try {
- data = JSON.parse(event.data);
- } catch (e) {
- util.log("Invalid server message", event.data);
- return;
- }
- self.emit(SocketEventType.Message, data);
- };
- this._socket.onclose = function(event) {
- util.log("Socket closed.");
- self._disconnected = true;
- self.emit(SocketEventType.Disconnected);
- };
- // Take care of the queue of connections if necessary and make sure Peer knows
- // socket is open.
- this._socket.onopen = function() {
- if (self._timeout) {
- clearTimeout(self._timeout);
- setTimeout(function() {
- self._httpRequest.abort();
- self._httpRequest = null;
- }, 5000);
- }
- self._sendQueuedMessages();
- util.log("Socket open");
- };
- }
- /** Start XHR streaming. */
- private _startXhrStream(streamIndex: number = 0) {
- const newRequest = new HttpRequest(streamIndex, this._httpUrl);
- this._httpRequest = newRequest;
- newRequest.onError = () => {
- // If we get an error, likely something went wrong.
- // Stop streaming.
- clearTimeout(this._timeout);
- this.emit(SocketEventType.Disconnected);
- };
- newRequest.onSuccess = () => {
- this._handleStream(newRequest);
- };
- try {
- newRequest.send();
- this._setHTTPTimeout();
- } catch (e) {
- util.log("XMLHttpRequest not available; defaulting to WebSockets");
- }
- }
- /** Handles onreadystatechange response as a stream. */
- private _handleStream(httpRequest: HttpRequest) {
- // 3 and 4 are loading/done state. All others are not relevant.
- const messages = httpRequest.getMessages();
- // Check to see if anything needs to be processed on buffer.
- while (httpRequest.hasBufferedIndices()) {
- const index = httpRequest.popBufferedIndex();
- let bufferedMessage = messages[index];
- try {
- bufferedMessage = JSON.parse(bufferedMessage);
- } catch (e) {
- //TODO should we need to put it back?
- httpRequest.buffer.unshift(index);
- break;
- }
- this.emit(SocketEventType.Message, bufferedMessage);
- }
- let message = messages[httpRequest.index];
- if (message) {
- httpRequest.index += 1;
- // Buffering--this message is incomplete and we'll get to it next time.
- // This checks if the httpResponse ended in a `\n`, in which case the last
- // element of messages should be the empty string.
- if (httpRequest.index === messages.length) {
- httpRequest.pushIndexToBuffer(httpRequest.index - 1);
- } else {
- try {
- message = JSON.parse(message);
- } catch (e) {
- util.log("Invalid server message", message);
- return;
- }
- this.emit(SocketEventType.Message, message);
- }
- }
- }
- private _setHTTPTimeout() {
- const self = this;
- this._timeout = setTimeout(function() {
- if (!self._wsOpen()) {
- const oldHttp = self._httpRequest;
- self._startXhrStream(oldHttp.streamIndex + 1);
- self._httpRequest.previousRequest = oldHttp;
- } else {
- self._httpRequest.abort();
- self._httpRequest = null;
- }
- }, this.HTTP_TIMEOUT);
- }
- /** Is the websocket currently open? */
- private _wsOpen(): boolean {
- return this._socket && this._socket.readyState == 1;
- }
- /** Send queued messages. */
- private _sendQueuedMessages(): void {
- //TODO is it ok?
- //Create copy of queue and clear it,
- //because send method push the message back to queue if smth will go wrong
- const copiedQueue = [...this._messagesQueue];
- this._messagesQueue = [];
- for (const message of copiedQueue) {
- this.send(message);
- }
- }
- /** Exposed send for DC & Peer. */
- send(data: any): void {
- if (this._disconnected) {
- return;
- }
- // If we didn't get an ID yet, we can't yet send anything so we should queue
- // up these messages.
- if (!this._id) {
- this._messagesQueue.push(data);
- return;
- }
- if (!data.type) {
- this.emit(SocketEventType.Error, "Invalid message");
- return;
- }
- const message = JSON.stringify(data);
- if (this._wsOpen()) {
- this._socket.send(message);
- } else {
- const http = new XMLHttpRequest();
- const url = this._httpUrl + "/" + data.type.toLowerCase();
- http.open("post", url, true);
- http.setRequestHeader("Content-Type", "application/json");
- http.send(message);
- }
- }
- close(): void {
- if (!this._disconnected && this._wsOpen()) {
- this._socket.close();
- this._disconnected = true;
- }
- }
- }
|