Cbor.ts 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import type { Peer } from "../../peer.js";
  2. import { Decoder, Encoder } from "cbor-x";
  3. import { StreamConnection } from "./StreamConnection.js";
  4. const NullValue = Symbol.for(null);
  5. function concatUint8Array(buffer1: Uint8Array, buffer2: Uint8Array) {
  6. const tmp = new Uint8Array(buffer1.byteLength + buffer2.byteLength);
  7. tmp.set(buffer1, 0);
  8. tmp.set(buffer2, buffer1.byteLength);
  9. return new Uint8Array(tmp.buffer);
  10. }
  11. const iterateOver = async function* (stream: ReadableStream) {
  12. const reader = stream.getReader();
  13. try {
  14. while (true) {
  15. const { done, value } = await reader.read();
  16. if (done) return;
  17. yield value;
  18. }
  19. } finally {
  20. reader.releaseLock();
  21. }
  22. };
  23. export class Cbor extends StreamConnection {
  24. readonly serialization = "Cbor";
  25. private _encoder = new Encoder();
  26. private _decoder = new Decoder();
  27. private _inc;
  28. private _decoderStream = new TransformStream<ArrayBuffer, unknown>({
  29. transform: (abchunk, controller) => {
  30. let chunk = new Uint8Array(abchunk);
  31. if (this._inc) {
  32. chunk = concatUint8Array(this._inc, chunk);
  33. this._inc = null;
  34. }
  35. let values;
  36. try {
  37. values = this._decoder.decodeMultiple(chunk);
  38. } catch (error) {
  39. if (error.incomplete) {
  40. this._inc = chunk.subarray(error.lastPosition);
  41. values = error.values;
  42. } else throw error;
  43. } finally {
  44. for (let value of values || []) {
  45. if (value === null) value = NullValue;
  46. controller.enqueue(value);
  47. }
  48. }
  49. },
  50. });
  51. constructor(peerId: string, provider: Peer, options: any) {
  52. super(peerId, provider, { ...options, reliable: true });
  53. void this._rawReadStream.pipeTo(this._decoderStream.writable);
  54. (async () => {
  55. for await (const msg of iterateOver(this._decoderStream.readable)) {
  56. if (msg.__peerData?.type === "close") {
  57. this.close();
  58. return;
  59. }
  60. this.emit("data", msg);
  61. }
  62. })();
  63. }
  64. protected override _send(data) {
  65. return this.writer.write(this._encoder.encode(data));
  66. }
  67. }