Bläddra i källkod

refactor: more concrete types for `ServerMessage` and simpler type for `Negotiator`

Jonas Gloning 1 år sedan
förälder
incheckning
dd04698289
8 ändrade filer med 153 tillägg och 66 borttagningar
  1. 2 2
      lib/baseconnection.ts
  2. 6 8
      lib/dataconnection/DataConnection.ts
  3. 6 7
      lib/mediaconnection.ts
  4. 30 30
      lib/negotiator.ts
  5. 16 9
      lib/peer.ts
  6. 82 0
      lib/serverMessages.ts
  7. 0 7
      lib/servermessage.ts
  8. 11 3
      lib/socket.ts

+ 2 - 2
lib/baseconnection.ts

@@ -1,5 +1,5 @@
 import type { Peer } from "./peer";
-import type { ServerMessage } from "./servermessage";
+import type { IncomingServerMessage } from "./serverMessages";
 import type { ConnectionType } from "./enums";
 import { BaseConnectionErrorType } from "./enums";
 import {
@@ -81,7 +81,7 @@ export abstract class BaseConnection<
 	/**
 	 * @internal
 	 */
-	abstract handleMessage(message: ServerMessage): void;
+	abstract handleMessage(message: IncomingServerMessage): void;
 
 	/**
 	 * Called by the Negotiator when the DataChannel is ready.

+ 6 - 8
lib/dataconnection/DataConnection.ts

@@ -8,7 +8,7 @@ import {
 } from "../enums";
 import type { Peer } from "../peer";
 import { BaseConnection, type BaseConnectionEvents } from "../baseconnection";
-import type { ServerMessage } from "../servermessage";
+import type { IncomingServerMessage } from "../serverMessages";
 import type { EventsWithError } from "../peerError";
 import { randomToken } from "../utils/randomToken";
 
@@ -35,11 +35,11 @@ export abstract class DataConnection extends BaseConnection<
 	protected static readonly ID_PREFIX = "dc_";
 	protected static readonly MAX_BUFFERED_AMOUNT = 8 * 1024 * 1024;
 
-	private _negotiator: Negotiator<DataConnectionEvents, this>;
+	private _negotiator: Negotiator;
 	abstract readonly serialization: string;
 	readonly reliable: boolean;
 
-	public get type() {
+	public get type(): ConnectionType.Data {
 		return ConnectionType.Data;
 	}
 
@@ -138,15 +138,13 @@ export abstract class DataConnection extends BaseConnection<
 		return this._send(data, chunked);
 	}
 
-	async handleMessage(message: ServerMessage) {
-		const payload = message.payload;
-
+	async handleMessage(message: IncomingServerMessage) {
 		switch (message.type) {
 			case ServerMessageType.Answer:
-				await this._negotiator.handleSDP(message.type, payload.sdp);
+				await this._negotiator.handleSDP(message.type, message.payload.sdp);
 				break;
 			case ServerMessageType.Candidate:
-				await this._negotiator.handleCandidate(payload.candidate);
+				await this._negotiator.handleCandidate(message.payload.candidate);
 				break;
 			default:
 				logger.warn(

+ 6 - 7
lib/mediaconnection.ts

@@ -4,7 +4,7 @@ import { Negotiator } from "./negotiator";
 import { ConnectionType, ServerMessageType } from "./enums";
 import type { Peer } from "./peer";
 import { BaseConnection, type BaseConnectionEvents } from "./baseconnection";
-import type { ServerMessage } from "./servermessage";
+import type { IncomingServerMessage } from "./serverMessages";
 import type { AnswerOption } from "./optionInterfaces";
 
 export interface MediaConnectionEvents extends BaseConnectionEvents<never> {
@@ -32,14 +32,14 @@ export class MediaConnection extends BaseConnection<MediaConnectionEvents> {
 	private static readonly ID_PREFIX = "mc_";
 	readonly label: string;
 
-	private _negotiator: Negotiator<MediaConnectionEvents, this>;
+	private _negotiator: Negotiator;
 	private _localStream: MediaStream;
 	private _remoteStream: MediaStream;
 
 	/**
 	 * For media connections, this is always 'media'.
 	 */
-	get type() {
+	get type(): ConnectionType.Media {
 		return ConnectionType.Media;
 	}
 
@@ -93,18 +93,17 @@ export class MediaConnection extends BaseConnection<MediaConnectionEvents> {
 	/**
 	 * @internal
 	 */
-	handleMessage(message: ServerMessage): void {
+	handleMessage(message: IncomingServerMessage): void {
 		const type = message.type;
-		const payload = message.payload;
 
 		switch (message.type) {
 			case ServerMessageType.Answer:
 				// Forward to negotiator
-				void this._negotiator.handleSDP(type, payload.sdp);
+				void this._negotiator.handleSDP(type, message.payload.sdp);
 				this._open = true;
 				break;
 			case ServerMessageType.Candidate:
-				void this._negotiator.handleCandidate(payload.candidate);
+				void this._negotiator.handleCandidate(message.payload.candidate);
 				break;
 			default:
 				logger.warn(`Unrecognized message type:${type} from peer:${this.peer}`);

+ 30 - 30
lib/negotiator.ts

@@ -7,17 +7,13 @@ import {
 	PeerErrorType,
 	ServerMessageType,
 } from "./enums";
-import type { BaseConnection, BaseConnectionEvents } from "./baseconnection";
-import type { ValidEventTypes } from "eventemitter3";
+import { BaseConnection } from "./baseconnection";
 
 /**
  * Manages all negotiations between Peers.
  */
-export class Negotiator<
-	Events extends ValidEventTypes,
-	ConnectionType extends BaseConnection<Events | BaseConnectionEvents>,
-> {
-	constructor(readonly connection: ConnectionType) {}
+export class Negotiator {
+	constructor(readonly connection: DataConnection | MediaConnection) {}
 
 	/** Returns a PeerConnection object set up correctly (for data, media). */
 	startConnection(options: any) {
@@ -119,8 +115,7 @@ export class Negotiator<
 					peerConnection.onicecandidate = () => {};
 					break;
 			}
-
-			this.connection.emit(
+			(this.connection as BaseConnection<{}>).emit(
 				"iceStateChanged",
 				peerConnection.iceConnectionState,
 			);
@@ -219,29 +214,34 @@ export class Negotiator<
 					`for:${this.connection.peer}`,
 				);
 
-				let payload: any = {
-					sdp: offer,
-					type: this.connection.type,
-					connectionId: this.connection.connectionId,
-					metadata: this.connection.metadata,
-				};
-
-				if (this.connection.type === ConnectionType.Data) {
-					const dataConnection = <DataConnection>(<unknown>this.connection);
-
-					payload = {
-						...payload,
-						label: dataConnection.label,
-						reliable: dataConnection.reliable,
-						serialization: dataConnection.serialization,
+				if (this.connection.type === ConnectionType.Media) {
+					const payload = {
+						sdp: offer,
+						type: this.connection.type,
+						connectionId: this.connection.connectionId,
+						metadata: this.connection.metadata,
+					};
+					provider.socket.send({
+						type: ServerMessageType.Offer,
+						dst: this.connection.peer,
+						payload,
+					});
+				} else {
+					const payload = {
+						sdp: offer,
+						type: this.connection.type,
+						connectionId: this.connection.connectionId,
+						metadata: this.connection.metadata,
+						label: this.connection.label,
+						reliable: this.connection.reliable,
+						serialization: this.connection.serialization,
 					};
+					provider.socket.send({
+						type: ServerMessageType.Offer,
+						dst: this.connection.peer,
+						payload,
+					});
 				}
-
-				provider.socket.send({
-					type: ServerMessageType.Offer,
-					payload,
-					dst: this.connection.peer,
-				});
 			} catch (err) {
 				// TODO: investigate why _makeOffer is being called from the answer
 				if (

+ 16 - 9
lib/peer.ts

@@ -9,7 +9,7 @@ import {
 	ServerMessageType,
 	SocketEventType,
 } from "./enums";
-import type { ServerMessage } from "./servermessage";
+import type { IncomingServerMessage } from "./serverMessages";
 import { API } from "./api";
 import type {
 	CallOption,
@@ -136,7 +136,8 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
 		string,
 		(DataConnection | MediaConnection)[]
 	> = new Map(); // All connections for this peer.
-	private readonly _lostMessages: Map<string, ServerMessage[]> = new Map(); // src => [list of messages]
+	private readonly _lostMessages: Map<string, IncomingServerMessage[]> =
+		new Map(); // src => [list of messages]
 	/**
 	 * The brokering ID of this peer
 	 *
@@ -308,7 +309,7 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
 			this._options.pingInterval,
 		);
 
-		socket.on(SocketEventType.Message, (data: ServerMessage) => {
+		socket.on(SocketEventType.Message, (data) => {
 			this._handleMessage(data);
 		});
 
@@ -346,10 +347,8 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
 	}
 
 	/** Handles messages from the server. */
-	private _handleMessage(message: ServerMessage): void {
+	private _handleMessage(message: IncomingServerMessage): void {
 		const type = message.type;
-		const payload = message.payload;
-		const peerId = message.src;
 
 		switch (type) {
 			case ServerMessageType.Open: // The connection to the server is open.
@@ -358,7 +357,7 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
 				this.emit("open", this.id);
 				break;
 			case ServerMessageType.Error: // Server error.
-				this._abort(PeerErrorType.ServerError, payload.msg);
+				this._abort(PeerErrorType.ServerError, message.payload.msg);
 				break;
 			case ServerMessageType.IdTaken: // The selected ID is taken.
 				this._abort(PeerErrorType.UnavailableID, `ID "${this.id}" is taken`);
@@ -370,6 +369,7 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
 				);
 				break;
 			case ServerMessageType.Leave: // Another peer has closed its connection to this peer.
+				const peerId = message.src;
 				logger.log(`Received leave message from ${peerId}`);
 				this._cleanupPeer(peerId);
 				this._connections.delete(peerId);
@@ -381,6 +381,8 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
 				);
 				break;
 			case ServerMessageType.Offer: {
+				const payload = message.payload;
+
 				// we should consider switching this to CALL/CONNECT, but this is the least breaking option.
 				const connectionId = payload.connectionId;
 				let connection = this.getConnection(peerId, connectionId);
@@ -420,6 +422,7 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
 					this._addConnection(peerId, connection);
 					this.emit("connection", dataConnection);
 				} else {
+					// @ts-expect-error payload type differs from specifications
 					logger.warn(`Received malformed connection type:${payload.type}`);
 					return;
 				}
@@ -433,6 +436,7 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
 				break;
 			}
 			default: {
+				const payload = message.payload;
 				if (!payload) {
 					logger.warn(
 						`You received a malformed message from ${peerId} of type ${type}`,
@@ -458,7 +462,10 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
 	}
 
 	/** Stores messages without a set up connection, to be claimed later. */
-	private _storeMessage(connectionId: string, message: ServerMessage): void {
+	private _storeMessage(
+		connectionId: string,
+		message: IncomingServerMessage,
+	): void {
 		if (!this._lostMessages.has(connectionId)) {
 			this._lostMessages.set(connectionId, []);
 		}
@@ -471,7 +478,7 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
 	 * @internal
 	 */
 	//TODO Change it to private
-	public _getMessages(connectionId: string): ServerMessage[] {
+	public _getMessages(connectionId: string) {
 		const messages = this._lostMessages.get(connectionId);
 
 		if (messages) {

+ 82 - 0
lib/serverMessages.ts

@@ -0,0 +1,82 @@
+import type { ServerMessageType } from "./enums";
+import { ConnectionType } from "./enums";
+
+export type OutgoingServerMessage = (
+	| OfferMessage
+	| AnswerMessage
+	| CandidateMessage
+	| HeartbeatMessage
+) & { dst: string };
+
+export type IncomingServerMessage =
+	| OpenMessage
+	| ErrorMessage
+	| IdTakenMessage
+	| InvalidKeyMessage
+	| ((
+			| AnswerMessage
+			| OfferMessage
+			| CandidateMessage
+			| LeaveMessage
+			| ExpireMessage
+	  ) & { src: string });
+
+interface ServerMessage<
+	Type extends ServerMessageType,
+	Payload extends object = never,
+> {
+	type: Type;
+	payload: Payload;
+}
+interface Offer {
+	sdp: RTCSessionDescriptionInit;
+	type: ConnectionType;
+	connectionId: string;
+	metadata: unknown;
+}
+interface MediaConnectionOffer extends Offer {
+	type: ConnectionType.Media;
+}
+interface DataConnectionOffer extends Offer {
+	type: ConnectionType.Data;
+	label: string;
+	serialization: string;
+	reliable: boolean;
+}
+
+interface OfferMessage
+	extends ServerMessage<
+		ServerMessageType.Offer,
+		DataConnectionOffer | MediaConnectionOffer
+	> {}
+interface AnswerMessage
+	extends ServerMessage<ServerMessageType.Answer, Omit<Offer, "metadata">> {}
+export interface OpenMessage {
+	type: ServerMessageType.Open;
+	dst: string;
+}
+interface CandidateMessage
+	extends ServerMessage<
+		ServerMessageType.Candidate,
+		{
+			candidate: RTCIceCandidate;
+			type: ConnectionType;
+			connectionId: string;
+		}
+	> {}
+interface LeaveMessage extends ServerMessage<ServerMessageType.Leave> {}
+interface ExpireMessage extends ServerMessage<ServerMessageType.Expire> {}
+interface ErrorMessage
+	extends ServerMessage<
+		ServerMessageType.Error,
+		{
+			msg: string;
+		}
+	> {}
+
+interface HeartbeatMessage extends ServerMessage<ServerMessageType.Heartbeat> {}
+
+interface IdTakenMessage extends ServerMessage<ServerMessageType.IdTaken> {}
+
+interface InvalidKeyMessage
+	extends ServerMessage<ServerMessageType.InvalidKey> {}

+ 0 - 7
lib/servermessage.ts

@@ -1,7 +0,0 @@
-import type { ServerMessageType } from "./enums";
-
-export class ServerMessage {
-	type: ServerMessageType;
-	payload: any;
-	src: string;
-}

+ 11 - 3
lib/socket.ts

@@ -2,15 +2,23 @@ import { EventEmitter } from "eventemitter3";
 import logger from "./logger";
 import { ServerMessageType, SocketEventType } from "./enums";
 import { version } from "../package.json";
+import { IncomingServerMessage, OutgoingServerMessage } from "./serverMessages";
+
+interface SocketEvents {
+	[SocketEventType.Disconnected]: () => void;
+	[SocketEventType.Error]: (error: "Invalid message") => void;
+	[SocketEventType.Message]: (message: IncomingServerMessage) => void;
+	[SocketEventType.Close]: () => void;
+}
 
 /**
  * An abstraction on top of WebSockets to provide fastest
  * possible connection for peers.
  */
-export class Socket extends EventEmitter {
+export class Socket extends EventEmitter<SocketEvents, never> {
 	private _disconnected: boolean = true;
 	private _id?: string;
-	private _messagesQueue: Array<object> = [];
+	private _messagesQueue: Array<OutgoingServerMessage> = [];
 	private _socket?: WebSocket;
 	private _wsPingTimer?: any;
 	private readonly _baseUrl: string;
@@ -121,7 +129,7 @@ export class Socket extends EventEmitter {
 	}
 
 	/** Exposed send for DC & Peer. */
-	send(data: any): void {
+	send(data: OutgoingServerMessage): void {
 		if (this._disconnected) {
 			return;
 		}