Browse Source

refactor: Custom `DataConnection` classes can be plugged-in at runtime

Jonas Gloning 1 năm trước cách đây
mục cha
commit
b6d3fdd759

+ 2 - 2
lib/baseconnection.ts

@@ -34,14 +34,14 @@ export abstract class BaseConnection<
 	connectionId: string;
 
 	peerConnection: RTCPeerConnection;
-	abstract get dataChannel(): RTCDataChannel;
+	dataChannel: RTCDataChannel;
 
 	abstract get type(): ConnectionType;
 
 	/**
 	 * The optional label passed in or assigned by PeerJS when the connection was initiated.
 	 */
-	abstract readonly label: string;
+	label: string;
 
 	/**
 	 * Whether the media connection is active (e.g. your call has been answered).

+ 0 - 396
lib/dataconnection.ts

@@ -1,396 +0,0 @@
-import { concatArrayBuffers, util } from "./util";
-import logger from "./logger";
-import { Negotiator } from "./negotiator";
-import { ConnectionType, SerializationType, ServerMessageType } from "./enums";
-import { Peer } from "./peer";
-import { BaseConnection } from "./baseconnection";
-import { ServerMessage } from "./servermessage";
-import { EncodingQueue } from "./encodingQueue";
-import type { DataConnection as IDataConnection } from "./dataconnection";
-
-export type DataConnectionEvents = {
-	/**
-	 * Emitted when data is received from the remote peer.
-	 *
-	 * ```ts
-	 * dataConnection.on('data', (data) => { ... });
-	 * ```
-	 */
-	data: (data: unknown) => void;
-	/**
-	 * Emitted when the connection is established and ready-to-use.
-	 *
-	 * ```ts
-	 * dataConnection.on('open', () => { ... });
-	 * ```
-	 */
-	open: () => void;
-};
-
-/**
- * Wraps WebRTC's DataChannel.
- * To get one, use {@apilink Peer.connect} or listen for the {@apilink PeerEvents | `connect`} event.
- */
-export class DataConnection
-	extends BaseConnection<DataConnectionEvents>
-	implements IDataConnection
-{
-	private static readonly ID_PREFIX = "dc_";
-	private static readonly MAX_BUFFERED_AMOUNT = 8 * 1024 * 1024;
-
-	private _negotiator: Negotiator<DataConnectionEvents, DataConnection>;
-	readonly label: string;
-
-	/**
-	 * The serialization format of the data sent over the connection.
-	 * {@apilink SerializationType | possible values}
-	 */
-	readonly serialization: SerializationType;
-	/**
-	 * Whether the underlying data channels are reliable; defined when the connection was initiated.
-	 */
-	readonly reliable: boolean;
-	stringify: (data: any) => string = JSON.stringify;
-	parse: (data: string) => any = JSON.parse;
-
-	get type() {
-		return ConnectionType.Data;
-	}
-
-	private _buffer: any[] = [];
-	/**
-	 * The number of messages queued to be sent once the browser buffer is no longer full.
-	 */
-	private _bufferSize = 0;
-	private _buffering = false;
-	private _chunkedData: {
-		[id: number]: {
-			data: Uint8Array[];
-			count: number;
-			total: number;
-		};
-	} = {};
-
-	private _dc: RTCDataChannel;
-	private _encodingQueue = new EncodingQueue();
-
-	/**
-	 * A reference to the RTCDataChannel object associated with the connection.
-	 */
-	get dataChannel(): RTCDataChannel {
-		return this._dc;
-	}
-
-	get bufferSize(): number {
-		return this._bufferSize;
-	}
-
-	constructor(peerId: string, provider: Peer, options: any) {
-		super(peerId, provider, options);
-
-		this.connectionId =
-			this.options.connectionId ||
-			DataConnection.ID_PREFIX + util.randomToken();
-
-		this.label = this.options.label || this.connectionId;
-		this.serialization = this.options.serialization || SerializationType.Binary;
-		this.reliable = !!this.options.reliable;
-
-		this._encodingQueue.on("done", (ab: ArrayBuffer) => {
-			this._bufferedSend(ab);
-		});
-
-		this._encodingQueue.on("error", () => {
-			logger.error(
-				`DC#${this.connectionId}: Error occured in encoding from blob to arraybuffer, close DC`,
-			);
-			this.close();
-		});
-
-		this._negotiator = new Negotiator(this);
-
-		this._negotiator.startConnection(
-			this.options._payload || {
-				originator: true,
-			},
-		);
-	}
-
-	/** Called by the Negotiator when the DataChannel is ready. */
-	override _initializeDataChannel(dc: RTCDataChannel): void {
-		this._dc = dc;
-		this.dataChannel.binaryType = "arraybuffer";
-
-		this.dataChannel.onopen = () => {
-			logger.log(`DC#${this.connectionId} dc connection success`);
-			this._open = true;
-			this.emit("open");
-		};
-
-		this.dataChannel.onmessage = (e) => {
-			logger.log(`DC#${this.connectionId} dc onmessage:`, e.data);
-			this._handleDataMessage(e);
-		};
-
-		this.dataChannel.onclose = () => {
-			logger.log(`DC#${this.connectionId} dc closed for:`, this.peer);
-			this.close();
-		};
-	}
-
-	// Handles a DataChannel message.
-	private _handleDataMessage({
-		data,
-	}: {
-		data: Blob | ArrayBuffer | string;
-	}): void {
-		const datatype = data.constructor;
-
-		const isBinarySerialization =
-			this.serialization === SerializationType.Binary ||
-			this.serialization === SerializationType.BinaryUTF8;
-
-		let deserializedData: any = data;
-
-		if (isBinarySerialization) {
-			if (datatype === Blob) {
-				// Datatype should never be blob
-				util.blobToArrayBuffer(data as Blob, (ab) => {
-					const unpackedData = util.unpack(ab);
-					this.emit("data", unpackedData);
-				});
-				return;
-			} else if (datatype === ArrayBuffer) {
-				deserializedData = util.unpack(data as ArrayBuffer);
-			} else if (datatype === String) {
-				// String fallback for binary data for browsers that don't support binary yet
-				const ab = util.binaryStringToArrayBuffer(data as string);
-				deserializedData = util.unpack(ab);
-			}
-		} else if (this.serialization === SerializationType.JSON) {
-			deserializedData = this.parse(data as string);
-		}
-
-		// PeerJS specific message
-		const peerData = deserializedData["__peerData"];
-		if (peerData) {
-			if (peerData.type === "close") {
-				this.close();
-				return;
-			}
-
-			// Chunked data -- piece things back together.
-			this._handleChunk(deserializedData);
-			return;
-		}
-
-		super.emit("data", deserializedData);
-	}
-
-	private _handleChunk(data: {
-		__peerData: number;
-		n: number;
-		total: number;
-		data: ArrayBuffer;
-	}): void {
-		const id = data.__peerData;
-		const chunkInfo = this._chunkedData[id] || {
-			data: [],
-			count: 0,
-			total: data.total,
-		};
-
-		chunkInfo.data[data.n] = new Uint8Array(data.data);
-		chunkInfo.count++;
-		this._chunkedData[id] = chunkInfo;
-
-		if (chunkInfo.total === chunkInfo.count) {
-			// Clean up before making the recursive call to `_handleDataMessage`.
-			delete this._chunkedData[id];
-
-			// We've received all the chunks--time to construct the complete data.
-			const data = concatArrayBuffers(chunkInfo.data);
-			this.emit("data", util.unpack(data));
-		}
-	}
-
-	/**
-	 * Exposed functionality for users.
-	 */
-
-	/** Allows user to close connection. */
-	close(options?: { flush?: boolean }) {
-		if (options?.flush) {
-			this.send({
-				__peerData: {
-					type: "close",
-				},
-			});
-			return;
-		}
-		this._buffer = [];
-		this._bufferSize = 0;
-		this._chunkedData = {};
-
-		if (this._negotiator) {
-			this._negotiator.cleanup();
-			this._negotiator = null;
-		}
-
-		if (this.provider) {
-			this.provider._removeConnection(this);
-
-			this.provider = null;
-		}
-
-		if (this.dataChannel) {
-			this.dataChannel.onopen = null;
-			this.dataChannel.onmessage = null;
-			this.dataChannel.onclose = null;
-			this._dc = null;
-		}
-
-		if (this._encodingQueue) {
-			this._encodingQueue.destroy();
-			this._encodingQueue.removeAllListeners();
-			this._encodingQueue = null;
-		}
-
-		if (!this.open) {
-			return;
-		}
-
-		this._open = false;
-
-		super.emit("close");
-	}
-
-	/**
-	 * `data` is serialized and sent to the remote peer.
-	 * @param data You can send any type of data, including objects, strings, and blobs.
-	 * @returns
-	 */
-	send(data: any, chunked?: boolean): void {
-		if (!this.open) {
-			super.emit(
-				"error",
-				new Error(
-					"Connection is not open. You should listen for the `open` event before sending messages.",
-				),
-			);
-			return;
-		}
-
-		if (data instanceof Blob) {
-			data.arrayBuffer().then((ab) => this.send(ab));
-			return;
-		}
-
-		if (this.serialization === SerializationType.JSON) {
-			this._bufferedSend(this.stringify(data));
-		} else if (
-			this.serialization === SerializationType.Binary ||
-			this.serialization === SerializationType.BinaryUTF8
-		) {
-			const blob = util.pack(data);
-
-			if (!chunked && blob.byteLength > util.chunkedMTU) {
-				this._sendChunks(blob);
-				return;
-			}
-
-			this._bufferedSend(blob);
-		} else {
-			this._bufferedSend(data);
-		}
-	}
-
-	private _bufferedSend(msg: any): void {
-		if (this._buffering || !this._trySend(msg)) {
-			this._buffer.push(msg);
-			this._bufferSize = this._buffer.length;
-		}
-	}
-
-	// Returns true if the send succeeds.
-	private _trySend(msg: any): boolean {
-		if (!this.open) {
-			return false;
-		}
-
-		if (this.dataChannel.bufferedAmount > DataConnection.MAX_BUFFERED_AMOUNT) {
-			this._buffering = true;
-			setTimeout(() => {
-				this._buffering = false;
-				this._tryBuffer();
-			}, 50);
-
-			return false;
-		}
-
-		try {
-			this.dataChannel.send(msg);
-		} catch (e) {
-			logger.error(`DC#:${this.connectionId} Error when sending:`, e);
-			this._buffering = true;
-
-			this.close();
-
-			return false;
-		}
-
-		return true;
-	}
-
-	// Try to send the first message in the buffer.
-	private _tryBuffer(): void {
-		if (!this.open) {
-			return;
-		}
-
-		if (this._buffer.length === 0) {
-			return;
-		}
-
-		const msg = this._buffer[0];
-
-		if (this._trySend(msg)) {
-			this._buffer.shift();
-			this._bufferSize = this._buffer.length;
-			this._tryBuffer();
-		}
-	}
-
-	private _sendChunks(blob: ArrayBuffer): void {
-		const blobs = util.chunk(blob);
-		logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
-
-		for (const blob of blobs) {
-			this.send(blob, true);
-		}
-	}
-
-	/**
-	 * @internal
-	 */
-	handleMessage(message: ServerMessage): void {
-		const payload = message.payload;
-
-		switch (message.type) {
-			case ServerMessageType.Answer:
-				this._negotiator.handleSDP(message.type, payload.sdp);
-				break;
-			case ServerMessageType.Candidate:
-				this._negotiator.handleCandidate(payload.candidate);
-				break;
-			default:
-				logger.warn(
-					"Unrecognized message type:",
-					message.type,
-					"from peer:",
-					this.peer,
-				);
-				break;
-		}
-	}
-}

+ 99 - 0
lib/dataconnection/BufferedConnection/BinaryJSConnection.ts

@@ -0,0 +1,99 @@
+import { concatArrayBuffers, util } from "../../util";
+import logger from "../../logger";
+import { Peer } from "../../peer";
+import { BufferedConnection } from "./BufferedConnection";
+import { SerializationType } from "../../enums";
+import { Packable, pack, unpack } from "peerjs-js-binarypack";
+
+export class BinaryJSConnection extends BufferedConnection {
+	readonly serialization = SerializationType.Binary;
+
+	private _chunkedData: {
+		[id: number]: {
+			data: Uint8Array[];
+			count: number;
+			total: number;
+		};
+	} = {};
+
+	public override close(options?: { flush?: boolean }) {
+		super.close(options);
+		this._chunkedData = {};
+	}
+
+	constructor(peerId: string, provider: Peer, options: any) {
+		super(peerId, provider, options);
+	}
+
+	// Handles a DataChannel message.
+	protected override _handleDataMessage({ data }: { data: Uint8Array }): void {
+		let deserializedData = unpack(data);
+
+		// PeerJS specific message
+		const peerData = deserializedData["__peerData"];
+		if (peerData) {
+			if (peerData.type === "close") {
+				this.close();
+				return;
+			}
+
+			// Chunked data -- piece things back together.
+			// @ts-ignore
+			this._handleChunk(deserializedData);
+			return;
+		}
+
+		this.emit("data", deserializedData);
+	}
+
+	private _handleChunk(data: {
+		__peerData: number;
+		n: number;
+		total: number;
+		data: ArrayBuffer;
+	}): void {
+		const id = data.__peerData;
+		const chunkInfo = this._chunkedData[id] || {
+			data: [],
+			count: 0,
+			total: data.total,
+		};
+
+		chunkInfo.data[data.n] = new Uint8Array(data.data);
+		chunkInfo.count++;
+		this._chunkedData[id] = chunkInfo;
+
+		if (chunkInfo.total === chunkInfo.count) {
+			// Clean up before making the recursive call to `_handleDataMessage`.
+			delete this._chunkedData[id];
+
+			// We've received all the chunks--time to construct the complete data.
+			// const data = new Blob(chunkInfo.data);
+			const data = concatArrayBuffers(chunkInfo.data);
+			this._handleDataMessage({ data });
+		}
+	}
+
+	protected override _send(
+		data: Packable,
+		chunked: boolean,
+	): void | Promise<void> {
+		const blob = pack(data);
+
+		if (!chunked && blob.byteLength > util.chunkedMTU) {
+			this._sendChunks(blob);
+			return;
+		}
+
+		this._bufferedSend(blob);
+	}
+
+	private _sendChunks(blob: ArrayBuffer) {
+		const blobs = util.chunk(blob);
+		logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
+
+		for (let blob of blobs) {
+			this.send(blob, true);
+		}
+	}
+}

+ 92 - 0
lib/dataconnection/BufferedConnection/BufferedConnection.ts

@@ -0,0 +1,92 @@
+import logger from "../../logger";
+import { DataConnection } from "../DataConnection";
+
+export abstract class BufferedConnection extends DataConnection {
+	private _buffer: any[] = [];
+	private _bufferSize = 0;
+	private _buffering = false;
+
+	public get bufferSize(): number {
+		return this._bufferSize;
+	}
+
+	public override _initializeDataChannel(dc: RTCDataChannel) {
+		super._initializeDataChannel(dc);
+		this.dataChannel.binaryType = "arraybuffer";
+		this.dataChannel.addEventListener("message", (e) =>
+			this._handleDataMessage(e),
+		);
+	}
+
+	protected abstract _handleDataMessage(e: MessageEvent): void;
+
+	protected _bufferedSend(msg: ArrayBuffer): void {
+		if (this._buffering || !this._trySend(msg)) {
+			this._buffer.push(msg);
+			this._bufferSize = this._buffer.length;
+		}
+	}
+
+	// Returns true if the send succeeds.
+	private _trySend(msg: ArrayBuffer): boolean {
+		if (!this.open) {
+			return false;
+		}
+
+		if (this.dataChannel.bufferedAmount > DataConnection.MAX_BUFFERED_AMOUNT) {
+			this._buffering = true;
+			setTimeout(() => {
+				this._buffering = false;
+				this._tryBuffer();
+			}, 50);
+
+			return false;
+		}
+
+		try {
+			this.dataChannel.send(msg);
+		} catch (e) {
+			logger.error(`DC#:${this.connectionId} Error when sending:`, e);
+			this._buffering = true;
+
+			this.close();
+
+			return false;
+		}
+
+		return true;
+	}
+
+	// Try to send the first message in the buffer.
+	private _tryBuffer(): void {
+		if (!this.open) {
+			return;
+		}
+
+		if (this._buffer.length === 0) {
+			return;
+		}
+
+		const msg = this._buffer[0];
+
+		if (this._trySend(msg)) {
+			this._buffer.shift();
+			this._bufferSize = this._buffer.length;
+			this._tryBuffer();
+		}
+	}
+
+	public override close(options?: { flush?: boolean }) {
+		if (options?.flush) {
+			this.send({
+				__peerData: {
+					type: "close",
+				},
+			});
+			return;
+		}
+		this._buffer = [];
+		this._bufferSize = 0;
+		super.close();
+	}
+}

+ 34 - 0
lib/dataconnection/BufferedConnection/JsonConnection.ts

@@ -0,0 +1,34 @@
+import { BufferedConnection } from "./BufferedConnection";
+import { SerializationType } from "../../enums";
+import { util } from "../../util";
+export class JsonConnection extends BufferedConnection {
+	readonly serialization = SerializationType.JSON;
+	private readonly encoder = new TextEncoder();
+	private readonly decoder = new TextDecoder();
+
+	stringify: (data: any) => string = JSON.stringify;
+	parse: (data: string) => any = JSON.parse;
+
+	// Handles a DataChannel message.
+	protected override _handleDataMessage({ data }: { data: Uint8Array }): void {
+		let deserializedData = this.parse(this.decoder.decode(data));
+
+		// PeerJS specific message
+		const peerData = deserializedData["__peerData"];
+		if (peerData && peerData.type === "close") {
+			this.close();
+			return;
+		}
+
+		this.emit("data", deserializedData);
+	}
+
+	override _send(data, _chunked) {
+		const encodedData = this.encoder.encode(this.stringify(data));
+		if (encodedData.byteLength >= util.chunkedMTU) {
+			this.emit("error", new Error("Message too big for JSON channel"));
+			return;
+		}
+		this._bufferedSend(encodedData);
+	}
+}

+ 14 - 0
lib/dataconnection/BufferedConnection/RawConnection.ts

@@ -0,0 +1,14 @@
+import { BufferedConnection } from "./BufferedConnection";
+import { SerializationType } from "../../enums";
+
+export class RawConnection extends BufferedConnection {
+	readonly serialization = SerializationType.None;
+
+	protected _handleDataMessage({ data }) {
+		super.emit("data", data);
+	}
+
+	override _send(data, _chunked) {
+		this._bufferedSend(data);
+	}
+}

+ 158 - 0
lib/dataconnection/DataConnection.ts

@@ -0,0 +1,158 @@
+import { util } from "../util";
+import logger from "../logger";
+import { Negotiator } from "../negotiator";
+import { ConnectionType, ServerMessageType } from "../enums";
+import { Peer } from "../peer";
+import { BaseConnection } from "../baseconnection";
+import { ServerMessage } from "../servermessage";
+import type { DataConnection as IDataConnection } from "./DataConnection";
+
+type DataConnectionEvents = {
+	/**
+	 * Emitted when data is received from the remote peer.
+	 */
+	data: (data: unknown) => void;
+	/**
+	 * Emitted when the connection is established and ready-to-use.
+	 */
+	open: () => void;
+};
+
+/**
+ * Wraps a DataChannel between two Peers.
+ */
+export abstract class DataConnection
+	extends BaseConnection<DataConnectionEvents>
+	implements IDataConnection
+{
+	protected static readonly ID_PREFIX = "dc_";
+	protected static readonly MAX_BUFFERED_AMOUNT = 8 * 1024 * 1024;
+
+	private _negotiator: Negotiator<DataConnectionEvents, this>;
+	abstract readonly serialization: string;
+	readonly reliable: boolean;
+
+	// public type: ConnectionType.Data;
+	public get type() {
+		return ConnectionType.Data;
+	}
+
+	constructor(peerId: string, provider: Peer, options: any) {
+		super(peerId, provider, options);
+
+		this.connectionId =
+			this.options.connectionId ||
+			DataConnection.ID_PREFIX + util.randomToken();
+
+		this.label = this.options.label || this.connectionId;
+		this.reliable = !!this.options.reliable;
+
+		this._negotiator = new Negotiator(this);
+
+		this._negotiator.startConnection(
+			this.options._payload || {
+				originator: true,
+				reliable: this.reliable,
+			},
+		);
+	}
+
+	/** Called by the Negotiator when the DataChannel is ready. */
+	override _initializeDataChannel(dc: RTCDataChannel): void {
+		this.dataChannel = dc;
+
+		this.dataChannel.onopen = () => {
+			logger.log(`DC#${this.connectionId} dc connection success`);
+			this._open = true;
+			this.emit("open");
+		};
+
+		this.dataChannel.onmessage = (e) => {
+			logger.log(`DC#${this.connectionId} dc onmessage:`, e.data);
+			// this._handleDataMessage(e);
+		};
+
+		this.dataChannel.onclose = () => {
+			logger.log(`DC#${this.connectionId} dc closed for:`, this.peer);
+			this.close();
+		};
+	}
+
+	/**
+	 * Exposed functionality for users.
+	 */
+
+	/** Allows user to close connection. */
+	close(options?: { flush?: boolean }): void {
+		if (options?.flush) {
+			this.send({
+				__peerData: {
+					type: "close",
+				},
+			});
+			return;
+		}
+		if (this._negotiator) {
+			this._negotiator.cleanup();
+			this._negotiator = null;
+		}
+
+		if (this.provider) {
+			this.provider._removeConnection(this);
+
+			this.provider = null;
+		}
+
+		if (this.dataChannel) {
+			this.dataChannel.onopen = null;
+			this.dataChannel.onmessage = null;
+			this.dataChannel.onclose = null;
+			this.dataChannel = null;
+		}
+
+		if (!this.open) {
+			return;
+		}
+
+		this._open = false;
+
+		super.emit("close");
+	}
+
+	protected abstract _send(data: any, chunked: boolean): void;
+
+	/** Allows user to send data. */
+	public send(data: any, chunked = false) {
+		if (!this.open) {
+			super.emit(
+				"error",
+				new Error(
+					"Connection is not open. You should listen for the `open` event before sending messages.",
+				),
+			);
+			return;
+		}
+		return this._send(data, chunked);
+	}
+
+	async handleMessage(message: ServerMessage) {
+		const payload = message.payload;
+
+		switch (message.type) {
+			case ServerMessageType.Answer:
+				await this._negotiator.handleSDP(message.type, payload.sdp);
+				break;
+			case ServerMessageType.Candidate:
+				await this._negotiator.handleCandidate(payload.candidate);
+				break;
+			default:
+				logger.warn(
+					"Unrecognized message type:",
+					message.type,
+					"from peer:",
+					this.peer,
+				);
+				break;
+		}
+	}
+}

+ 1 - 0
lib/enums.ts

@@ -64,6 +64,7 @@ export enum SerializationType {
 	Binary = "binary",
 	BinaryUTF8 = "binary-utf8",
 	JSON = "json",
+	None = "raw",
 }
 
 export enum SocketEventType {

+ 2 - 3
lib/exports.ts

@@ -9,9 +9,8 @@ export type {
 	CallOption,
 } from "./optionInterfaces";
 export type { UtilSupportsObj } from "./util";
-export type { BaseConnection, BaseConnectionEvents } from "./baseconnection";
-export type { DataConnection, DataConnectionEvents } from "./dataconnection";
-export type { MediaConnection, MediaConnectionEvents } from "./mediaconnection";
+export type { DataConnection } from "./dataconnection/DataConnection";
+export type { MediaConnection } from "./mediaconnection";
 export type { LogLevel } from "./logger";
 export type {
 	ConnectionType,

+ 11 - 15
lib/mediaconnection.ts

@@ -35,7 +35,6 @@ export class MediaConnection extends BaseConnection<MediaConnectionEvents> {
 	private _negotiator: Negotiator<MediaConnectionEvents, MediaConnection>;
 	private _localStream: MediaStream;
 	private _remoteStream: MediaStream;
-	private _dc: RTCDataChannel;
 
 	/**
 	 * For media connections, this is always 'media'.
@@ -47,14 +46,11 @@ export class MediaConnection extends BaseConnection<MediaConnectionEvents> {
 	get localStream(): MediaStream {
 		return this._localStream;
 	}
+
 	get remoteStream(): MediaStream {
 		return this._remoteStream;
 	}
 
-	get dataChannel(): RTCDataChannel {
-		return this._dc;
-	}
-
 	constructor(peerId: string, provider: Peer, options: any) {
 		super(peerId, provider, options);
 
@@ -75,7 +71,7 @@ export class MediaConnection extends BaseConnection<MediaConnectionEvents> {
 
 	/** Called by the Negotiator when the DataChannel is ready. */
 	override _initializeDataChannel(dc: RTCDataChannel): void {
-		this._dc = dc;
+		this.dataChannel = dc;
 
 		this.dataChannel.onopen = () => {
 			logger.log(`DC#${this.connectionId} dc connection success`);
@@ -117,15 +113,15 @@ export class MediaConnection extends BaseConnection<MediaConnectionEvents> {
 	}
 
 	/**
-	 * When receiving a {@apilink PeerEvents | `call`} event on a peer, you can call
-	 * `answer` on the media connection provided by the callback to accept the call
-	 * and optionally send your own media stream.
-
-	 *
-	 * @param stream A WebRTC media stream.
-	 * @param options
-	 * @returns
-	 */
+     * When receiving a {@apilink PeerEvents | `call`} event on a peer, you can call
+     * `answer` on the media connection provided by the callback to accept the call
+     * and optionally send your own media stream.
+
+     *
+     * @param stream A WebRTC media stream.
+     * @param options
+     * @returns
+     */
 	answer(stream?: MediaStream, options: AnswerOption = {}): void {
 		if (this._localStream) {
 			logger.warn(

+ 4 - 16
lib/negotiator.ts

@@ -1,7 +1,7 @@
 import { util } from "./util";
 import logger from "./logger";
 import { MediaConnection } from "./mediaconnection";
-import { DataConnection } from "./dataconnection";
+import { DataConnection } from "./dataconnection/DataConnection";
 import { ConnectionType, PeerErrorType, ServerMessageType } from "./enums";
 import { BaseConnection, BaseConnectionEvents } from "./baseconnection";
 import { ValidEventTypes } from "eventemitter3";
@@ -323,26 +323,14 @@ export class Negotiator<
 	}
 
 	/** Handle a candidate. */
-	async handleCandidate(ice: any): Promise<void> {
+	async handleCandidate(ice: RTCIceCandidate) {
 		logger.log(`handleCandidate:`, ice);
 
-		const candidate = ice.candidate;
-		const sdpMLineIndex = ice.sdpMLineIndex;
-		const sdpMid = ice.sdpMid;
-		const peerConnection = this.connection.peerConnection;
-		const provider = this.connection.provider;
-
 		try {
-			await peerConnection.addIceCandidate(
-				new RTCIceCandidate({
-					sdpMid: sdpMid,
-					sdpMLineIndex: sdpMLineIndex,
-					candidate: candidate,
-				}),
-			);
+			await this.connection.peerConnection.addIceCandidate(ice);
 			logger.log(`Added ICE candidate for:${this.connection.peer}`);
 		} catch (err) {
-			provider.emitError(PeerErrorType.WebRTC, err);
+			this.connection.provider.emitError(PeerErrorType.WebRTC, err);
 			logger.log("Failed to handleCandidate, ", err);
 		}
 	}

+ 46 - 11
lib/peer.ts

@@ -3,7 +3,7 @@ import { util } from "./util";
 import logger, { LogLevel } from "./logger";
 import { Socket } from "./socket";
 import { MediaConnection } from "./mediaconnection";
-import { DataConnection } from "./dataconnection";
+import { DataConnection } from "./dataconnection/DataConnection";
 import {
 	ConnectionType,
 	PeerErrorType,
@@ -17,6 +17,9 @@ import type {
 	PeerJSOption,
 	CallOption,
 } from "./optionInterfaces";
+import { BinaryJSConnection } from "./dataconnection/BufferedConnection/BinaryJSConnection";
+import { RawConnection } from "./dataconnection/BufferedConnection/RawConnection";
+import { JsonConnection } from "./dataconnection/BufferedConnection/JsonConnection";
 
 class PeerOptions implements PeerJSOption {
 	/**
@@ -60,6 +63,7 @@ class PeerOptions implements PeerJSOption {
 	pingInterval?: number;
 	referrerPolicy?: ReferrerPolicy;
 	logFunction?: (logLevel: LogLevel, ...rest: any[]) => void;
+	serializers?: SerializerMapping;
 }
 
 class PeerError extends Error {
@@ -78,6 +82,14 @@ class PeerError extends Error {
 }
 export type { PeerError, PeerOptions };
 
+export type SerializerMapping = {
+	[key: string]: new (
+		peerId: string,
+		provider: Peer,
+		options: any,
+	) => DataConnection;
+};
+
 export type PeerEvents = {
 	/**
 	 * Emitted when a connection to the PeerServer is established.
@@ -114,6 +126,14 @@ export type PeerEvents = {
 export class Peer extends EventEmitter<PeerEvents> {
 	private static readonly DEFAULT_KEY = "peerjs";
 
+	private readonly _serializers: SerializerMapping = {
+		raw: RawConnection,
+		json: JsonConnection,
+		binary: BinaryJSConnection,
+		"binary-utf8": BinaryJSConnection,
+
+		default: BinaryJSConnection,
+	};
 	private readonly _options: PeerOptions;
 	private readonly _api: API;
 	private readonly _socket: Socket;
@@ -225,9 +245,11 @@ export class Peer extends EventEmitter<PeerEvents> {
 			token: util.randomToken(),
 			config: util.defaultConfig,
 			referrerPolicy: "strict-origin-when-cross-origin",
+			serializers: {},
 			...options,
 		};
 		this._options = options;
+		this._serializers = { ...this._serializers, ...this.options.serializers };
 
 		// Detect relative URL host.
 		if (this._options.host === "/") {
@@ -394,15 +416,20 @@ export class Peer extends EventEmitter<PeerEvents> {
 					this._addConnection(peerId, connection);
 					this.emit("call", mediaConnection);
 				} else if (payload.type === ConnectionType.Data) {
-					const dataConnection = new DataConnection(peerId, this, {
-						connectionId: connectionId,
-						_payload: payload,
-						metadata: payload.metadata,
-						label: payload.label,
-						serialization: payload.serialization,
-						reliable: payload.reliable,
-					});
+					const dataConnection = new this._serializers[payload.serialization](
+						peerId,
+						this,
+						{
+							connectionId: connectionId,
+							_payload: payload,
+							metadata: payload.metadata,
+							label: payload.label,
+							serialization: payload.serialization,
+							reliable: payload.reliable,
+						},
+					);
 					connection = dataConnection;
+
 					this._addConnection(peerId, connection);
 					this.emit("connection", dataConnection);
 				} else {
@@ -473,7 +500,11 @@ export class Peer extends EventEmitter<PeerEvents> {
 	 * @param peer The brokering ID of the remote peer (their {@apilink Peer.id}).
 	 * @param options for specifying details about Peer Connection
 	 */
-	connect(peer: string, options: PeerConnectOption = {}): DataConnection {
+	connect(peer: string, options: PeerConnectOption): DataConnection {
+		options = {
+			serialization: "default",
+			...options,
+		};
 		if (this.disconnected) {
 			logger.warn(
 				"You cannot connect to a new Peer because you called " +
@@ -488,7 +519,11 @@ export class Peer extends EventEmitter<PeerEvents> {
 			return;
 		}
 
-		const dataConnection = new DataConnection(peer, this, options);
+		const dataConnection = new this._serializers[options.serialization](
+			peer,
+			this,
+			options,
+		);
 		this._addConnection(peer, dataConnection);
 		return dataConnection;
 	}

+ 1 - 1
lib/util.ts

@@ -132,7 +132,7 @@ export class Util {
 
 	chunk(
 		blob: ArrayBuffer,
-	): { __peerData: number; n: number; total: number; data: ArrayBuffer }[] {
+	): { __peerData: number; n: number; total: number; data: Uint8Array }[] {
 		const chunks = [];
 		const size = blob.byteLength;
 		const total = Math.ceil(size / util.chunkedMTU);