Prechádzať zdrojové kódy

GramJS: Fix images loading: add `async-mutex`; Fix signed LE ints; Bring back `readExactly`

painor 5 rokov pred
rodič
commit
279aacaa52

+ 5 - 0
package-lock.json

@@ -1738,6 +1738,11 @@
       "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==",
       "dev": true
     },
+    "async-mutex": {
+      "version": "0.1.4",
+      "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.1.4.tgz",
+      "integrity": "sha512-zVWTmAnxxHaeB2B1te84oecI8zTDJ/8G49aVBblRX6be0oq6pAybNcUSxwfgVOmOjSCvN4aYZAqwtyNI8e1YGw=="
+    },
     "asynckit": {
       "version": "0.4.0",
       "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",

+ 10 - 0
package.json

@@ -52,6 +52,7 @@
     "typescript": "^3.6.4"
   },
   "dependencies": {
+<<<<<<< HEAD
 <<<<<<< HEAD
     "aes-js": "^3.1.2",
     "crc": "^3.8.0",
@@ -64,10 +65,14 @@
     "string-format": "^2.0.0",
     "td": "^0.3.2",
 =======
+=======
+    "async-mutex": "^0.1.4",
+>>>>>>> 1a0b5c54... GramJS: Fix images loading: add `async-mutex`; Fix signed LE ints; Bring back `readExactly`
     "big-integer": "latest",
     "croppie": "^2.6.4",
     "emoji-regex": "^8.0.0",
     "events": "^3.0.0",
+<<<<<<< HEAD
 >>>>>>> 1ab480fe... Gram JS: Remove deps; Fix Factorizator and remove leemon lib
     "tdweb": "^1.5.0",
     "websocket": "^1.0.30",
@@ -76,5 +81,10 @@
 =======
     "pako": "latest"
 >>>>>>> f8f1a405... GramJS : Remove zlib and other small fixes.
+=======
+    "pako": "latest",
+    "tdweb": "^1.5.0",
+    "websocket": "^1.0.30"
+>>>>>>> 1a0b5c54... GramJS: Fix images loading: add `async-mutex`; Fix signed LE ints; Bring back `readExactly`
   }
 }

+ 42 - 0
src/api/gramjs/connectors/media.ts

@@ -0,0 +1,42 @@
+import { Api as GramJs, TelegramClient } from '../../../lib/gramjs';
+
+import localDb from '../localDb';
+import { getEntityTypeById } from '../inputHelpers';
+
+// TODO Await client ready.
+export default function downloadMedia(client: TelegramClient, url: string): Promise<Buffer | null> | null {
+  const mediaMatch = url.match(/(avatar|msg)(-?\d+)/);
+  if (!mediaMatch) {
+    return null;
+  }
+
+  let entityType = mediaMatch[1];
+  let entityId: number = Number(mediaMatch[2]);
+  let entity: GramJs.User | GramJs.Chat | GramJs.Channel | GramJs.Message | undefined;
+
+  if (entityType === 'avatar') {
+    entityType = getEntityTypeById(entityId);
+    entityId = Math.abs(entityId);
+  }
+
+  switch (entityType) {
+    case 'channel':
+    case 'chat':
+      entity = localDb.chats[entityId];
+      break;
+    case 'user':
+      entity = localDb.users[entityId];
+      break;
+    case 'msg':
+      entity = localDb.messages[entityId];
+      break;
+  }
+
+  if (!entity) {
+    return null;
+  }
+
+  return entityType === 'msg'
+    ? client.downloadMedia(entity, { sizeType: 'x' })
+    : client.downloadProfilePhoto(entity, false);
+}

+ 60 - 21
src/lib/gramjs/Helpers.js

@@ -17,11 +17,28 @@ function readBigIntFromBuffer(buffer, little = true, signed = false) {
     }
     let bigInt = BigInt(randBuffer.toString('hex'), 16)
     if (signed && Math.floor(bigInt.toString('2').length / 8) >= bytesNumber) {
-        bigInt = bigInt.subtract(BigInt(2).pow(BigInt(bytesNumber * 8)))
+        bigInt = bigInt.subtract(BigInt(2)
+            .pow(BigInt(bytesNumber * 8)))
     }
     return bigInt
 }
 
+/**
+ * Special case signed little ints
+ * @param big
+ * @param number
+ * @returns {Buffer}
+ */
+function toSignedLittleBuffer(big, number=8) {
+    const bigNumber = BigInt(big)
+    const byteArray = []
+    for (let i = 0; i < number; i++) {
+        byteArray[i] = bigNumber.shiftRight(8*i).and(255)
+    }
+    return Buffer.from(byteArray)
+}
+
+
 /**
  * converts a big int to a buffer
  * @param bigInt {BigInteger}
@@ -44,10 +61,11 @@ function readBufferFromBigInt(bigInt, bytesNumber, little = true, signed = false
     let below = false
     if (bigInt.lesser(BigInt(0))) {
         below = true
-        bigInt = bigInt.subtract(bigInt.add(bigInt))
+        bigInt = bigInt.abs()
     }
 
-    const hex = bigInt.toString('16').padStart(bytesNumber * 2, '0')
+    const hex = bigInt.toString('16')
+        .padStart(bytesNumber * 2, '0')
     let l = Buffer.from(hex, 'hex')
     if (little) {
         l = l.reverse()
@@ -55,8 +73,19 @@ function readBufferFromBigInt(bigInt, bytesNumber, little = true, signed = false
 
     if (signed && below) {
         if (little) {
-            l[0] = 256 - l[0]
-            for (let i = 1; i < l.length; i++) {
+            let reminder = false
+            if (l[0] !== 0) {
+                l[0] -= 1
+            }
+            for (let i = 0; i < l.length; i++) {
+                if (l[i] === 0) {
+                    reminder = true
+                    continue
+                }
+                if (reminder) {
+                    l[i] -= 1
+                    reminder = false
+                }
                 l[i] = 255 - l[i]
             }
         } else {
@@ -93,9 +122,10 @@ function mod(n, m) {
  * @param m {BigInt}
  * @returns {BigInt}
  */
-function bigIntMod(n,m) {
+function bigIntMod(n, m) {
     return ((n.remainder(m)).add(m)).remainder(m)
 }
+
 /**
  * Generates a random bytes array
  * @param count
@@ -117,14 +147,17 @@ function generateRandomBytes(count) {
 async function calcKey(sharedKey, msgKey, client) {
     const x = client === true ? 0 : 8
     const [sha1a, sha1b, sha1c, sha1d] = await Promise.all([
-        sha1(Buffer.concat([ msgKey, sharedKey.slice(x, x + 32) ])),
-        sha1(Buffer.concat([ sharedKey.slice(x + 32, x + 48), msgKey, sharedKey.slice(x + 48, x + 64) ])),
-        sha1(Buffer.concat([ sharedKey.slice(x + 64, x + 96), msgKey ])),
-        sha1(Buffer.concat([ msgKey, sharedKey.slice(x + 96, x + 128) ]))
+        sha1(Buffer.concat([msgKey, sharedKey.slice(x, x + 32)])),
+        sha1(Buffer.concat([sharedKey.slice(x + 32, x + 48), msgKey, sharedKey.slice(x + 48, x + 64)])),
+        sha1(Buffer.concat([sharedKey.slice(x + 64, x + 96), msgKey])),
+        sha1(Buffer.concat([msgKey, sharedKey.slice(x + 96, x + 128)]))
     ])
-    const key = Buffer.concat([ sha1a.slice(0, 8), sha1b.slice(8, 20), sha1c.slice(4, 16) ])
-    const iv = Buffer.concat([ sha1a.slice(8, 20), sha1b.slice(0, 8), sha1c.slice(16, 20), sha1d.slice(0, 8) ])
-    return { key, iv }
+    const key = Buffer.concat([sha1a.slice(0, 8), sha1b.slice(8, 20), sha1c.slice(4, 16)])
+    const iv = Buffer.concat([sha1a.slice(8, 20), sha1b.slice(0, 8), sha1c.slice(16, 20), sha1d.slice(0, 8)])
+    return {
+        key,
+        iv
+    }
 }
 
 /**
@@ -134,16 +167,19 @@ async function calcKey(sharedKey, msgKey, client) {
  * @returns {{key: Buffer, iv: Buffer}}
  */
 async function generateKeyDataFromNonce(serverNonce, newNonce) {
-    serverNonce = readBufferFromBigInt(serverNonce, 16, true, true)
-    newNonce = readBufferFromBigInt(newNonce, 32, true, true)
+    serverNonce = toSignedLittleBuffer(serverNonce, 16)
+    newNonce = toSignedLittleBuffer(newNonce, 32)
     const [hash1, hash2, hash3] = await Promise.all([
-        sha1(Buffer.concat([ newNonce, serverNonce ])),
-        sha1(Buffer.concat([ serverNonce, newNonce ])),
-        sha1(Buffer.concat([ newNonce, newNonce ]))
+        sha1(Buffer.concat([newNonce, serverNonce])),
+        sha1(Buffer.concat([serverNonce, newNonce])),
+        sha1(Buffer.concat([newNonce, newNonce]))
     ])
-    const keyBuffer = Buffer.concat([ hash1, hash2.slice(0, 12) ])
-    const ivBuffer = Buffer.concat([ hash2.slice(12, 20), hash3, newNonce.slice(0, 4) ])
-    return { key: keyBuffer, iv: ivBuffer }
+    const keyBuffer = Buffer.concat([hash1, hash2.slice(0, 12)])
+    const ivBuffer = Buffer.concat([hash2.slice(12, 20), hash3, newNonce.slice(0, 4)])
+    return {
+        key: keyBuffer,
+        iv: ivBuffer
+    }
 }
 
 /**
@@ -205,6 +241,7 @@ function getByteArray(integer, signed = false) {
     const byteLength = Math.floor((bits + 8 - 1) / 8)
     return readBufferFromBigInt(BigInt(integer), byteLength, false, signed)
 }
+
 /**
  * returns a random int from min (inclusive) and max (inclusive)
  * @param min
@@ -274,6 +311,7 @@ function crc32(buf) {
     }
     return (crc ^ (-1)) >>> 0
 }
+
 module.exports = {
     readBigIntFromBuffer,
     readBufferFromBigInt,
@@ -291,4 +329,5 @@ module.exports = {
     sleep,
     getByteArray,
     isArrayLike,
+    toSignedLittleBuffer
 }

+ 2 - 2
src/lib/gramjs/crypto/AuthKey.js

@@ -1,4 +1,4 @@
-const { sha1, readBufferFromBigInt, readBigIntFromBuffer } = require('../Helpers')
+const { sha1, toSignedLittleBuffer,readBufferFromBigInt, readBigIntFromBuffer } = require('../Helpers')
 const BinaryReader = require('../extensions/BinaryReader')
 const { sleep } = require('../Helpers')
 
@@ -55,7 +55,7 @@ class AuthKey {
      * @returns {bigint}
      */
     async calcNewNonceHash(newNonce, number) {
-        newNonce = readBufferFromBigInt(newNonce, 32, true, true)
+        newNonce = toSignedLittleBuffer(newNonce, 32)
         const n = Buffer.alloc(1)
         n.writeUInt8(number, 0)
         const data = Buffer.concat([newNonce,

+ 37 - 29
src/lib/gramjs/extensions/PromisedWebSockets.js

@@ -1,3 +1,7 @@
+const BinaryReader = require('../extensions/BinaryReader')
+const Mutex = require('async-mutex').Mutex
+const mutex = new Mutex()
+
 const WebSocketClient = require('websocket').w3cwebsocket
 
 const closeError = new Error('WebSocket was closed')
@@ -13,18 +17,17 @@ class PromisedWebSockets {
         this.closed = true
     }
 
-    // TODO This hangs in certain situations (issues with big files) and breaks subsequent calls.
-    // async readExactly(number) {
-    //     let readData = Buffer.alloc(0)
-    //     while (true) {
-    //         const thisTime = await this.read(number)
-    //         readData = Buffer.concat([readData, thisTime])
-    //         number = number - thisTime.length
-    //         if (!number) {
-    //             return readData
-    //         }
-    //     }
-    // }
+    async readExactly(number) {
+        let readData = Buffer.alloc(0)
+        while (true) {
+            const thisTime = await this.read(number)
+            readData = Buffer.concat([readData, thisTime])
+            number = number - thisTime.length
+            if (!number) {
+                return readData
+            }
+        }
+    }
 
     async read(number) {
         if (this.closed) {
@@ -67,28 +70,27 @@ class PromisedWebSockets {
     async connect(port, ip) {
         console.log('trying to connect')
         this.stream = Buffer.alloc(0)
-
         this.canRead = new Promise((resolve) => {
             this.resolveRead = resolve
         })
         this.closed = false
         this.website = this.getWebSocketLink(ip, port)
         this.client = new WebSocketClient(this.website, 'binary')
-        return new Promise(function (resolve, reject) {
-            this.client.onopen = function () {
+        return new Promise((resolve, reject) => {
+            this.client.onopen = () => {
                 this.receive()
                 resolve(this)
-            }.bind(this)
-            this.client.onerror = function (error) {
+            }
+            this.client.onerror = (error) => {
                 reject(error)
             }
-            this.client.onclose = function () {
+            this.client.onclose = () => {
                 if (this.client.closed) {
                     this.resolveRead(false)
                     this.closed = true
                 }
-            }.bind(this)
-        }.bind(this))
+            }
+        })
     }
 
     write(data) {
@@ -105,16 +107,22 @@ class PromisedWebSockets {
     }
 
     async receive() {
-        this.client.onmessage = async function (message) {
-            let data
-            if (this.isBrowser) {
-                data = Buffer.from(await new Response(message.data).arrayBuffer())
-            } else {
-                data = Buffer.from(message.data)
+        this.client.onmessage = async (message) => {
+            const release = await mutex.acquire()
+            try {
+                let data
+                if (this.isBrowser) {
+                    data = Buffer.from(await new Response(message.data).arrayBuffer())
+                } else {
+                    data = Buffer.from(message.data)
+                }
+
+                this.stream = Buffer.concat([this.stream, data])
+                this.resolveRead(true)
+            } finally {
+                release()
             }
-            this.stream = Buffer.concat([this.stream, data])
-            this.resolveRead(true)
-        }.bind(this)
+        }
     }
 }
 

+ 1 - 1
src/lib/gramjs/network/Authenticator.js

@@ -87,7 +87,7 @@ async function doAuthentication(sender, log) {
     }
 
     if (serverDhParams instanceof constructors.ServerDHParamsFail) {
-        const sh = await Helpers.sha1(Helpers.readBufferFromBigInt(newNonce, 32, true, true).slice(4, 20))
+        const sh = await Helpers.sha1(Helpers.toSignedLittleBuffer(newNonce, 32).slice(4, 20))
         const nnh = Helpers.readBigIntFromBuffer(sh, true, true)
         if (serverDhParams.newNonceHash.neq(nnh)) {
             throw new SecurityError('Step 2 invalid DH fail nonce from server')

+ 2 - 2
src/lib/gramjs/network/MTProtoPlainSender.js

@@ -7,7 +7,7 @@ const MTProtoState = require('./MTProtoState')
 const BinaryReader = require('../extensions/BinaryReader')
 const { InvalidBufferError } = require('../errors/Common')
 const BigInt = require('big-integer')
-const { readBufferFromBigInt } = require("../Helpers")
+const { toSignedLittleBuffer } = require("../Helpers")
 
 /**
  * MTProto Mobile Protocol plain sender (https://core.telegram.org/mtproto/description#unencrypted-messages)
@@ -32,7 +32,7 @@ class MTProtoPlainSender {
 
         let body = request.getBytes()
         let msgId = this._state._getNewMsgId()
-        const m = readBufferFromBigInt(msgId, 8, true, true)
+        const m = toSignedLittleBuffer(msgId, 8)
         const b = Buffer.alloc(4)
         b.writeInt32LE(body.length, 0)
 

+ 4 - 4
src/lib/gramjs/network/MTProtoState.js

@@ -6,7 +6,7 @@ const { TLMessage } = require('../tl/core')
 const { SecurityError, InvalidBufferError } = require('../errors/Common')
 const { InvokeAfterMsgRequest } = require('../tl').requests
 const BigInt = require('big-integer')
-const { readBufferFromBigInt } = require("../Helpers")
+const { toSignedLittleBuffer,readBufferFromBigInt } = require("../Helpers")
 const { readBigIntFromBuffer } = require("../Helpers")
 
 class MTProtoState {
@@ -103,7 +103,7 @@ class MTProtoState {
         s.writeInt32LE(seqNo, 0)
         const b = Buffer.alloc(4)
         b.writeInt32LE(body.length, 0)
-        const m = readBufferFromBigInt(msgId, 8, true, true)
+        const m = toSignedLittleBuffer(msgId, 8)
         buffer.write(Buffer.concat([m, s, b]))
         buffer.write(body)
         return msgId
@@ -116,8 +116,8 @@ class MTProtoState {
      */
     async encryptMessageData(data) {
         await this.authKey.waitForKey()
-        const s = readBufferFromBigInt(this.salt,8,true,true)
-        const i = readBufferFromBigInt(this.id,8,true,true)
+        const s = toSignedLittleBuffer(this.salt,8)
+        const i = toSignedLittleBuffer(this.id,8)
         data = Buffer.concat([Buffer.concat([s,i]), data])
         const padding = Helpers.generateRandomBytes(Helpers.mod(-(data.length + 12), 16) + 12)
         // Being substr(what, offset, length); x = 0 for client

+ 165 - 0
src/lib/gramjs/network/connection/Connection.js

@@ -0,0 +1,165 @@
+const PromisedWebSockets = require('../../extensions/PromisedWebSockets')
+const AsyncQueue = require('../../extensions/AsyncQueue')
+
+/**
+ * The `Connection` class is a wrapper around ``asyncio.open_connection``.
+ *
+ * Subclasses will implement different transport modes as atomic operations,
+ * which this class eases doing since the exposed interface simply puts and
+ * gets complete data payloads to and from queues.
+ *
+ * The only error that will raise from send and receive methods is
+ * ``ConnectionError``, which will raise when attempting to send if
+ * the client is disconnected (includes remote disconnections).
+ */
+class Connection {
+    PacketCodecClass = null
+
+    constructor(ip, port, dcId, loggers) {
+        this._ip = ip
+        this._port = port
+        this._dcId = dcId
+        this._log = loggers
+        this._connected = false
+        this._sendTask = null
+        this._recvTask = null
+        this._codec = null
+        this._obfuscation = null // TcpObfuscated and MTProxy
+        this._sendArray = new AsyncQueue()
+        this._recvArray = new AsyncQueue()
+        //this.socket = new PromiseSocket(new Socket())
+
+        this.socket = new PromisedWebSockets()
+    }
+
+    async _connect() {
+        this._log.debug('Connecting')
+        this._codec = new this.PacketCodecClass(this)
+        await this.socket.connect(this._port, this._ip,this)
+        this._log.debug('Finished connecting')
+        // await this.socket.connect({host: this._ip, port: this._port});
+        await this._initConn()
+    }
+
+    async connect() {
+        await this._connect()
+        this._connected = true
+        if (!this._sendTask) {
+            this._sendTask = this._sendLoop()
+        }
+        this._recvTask = this._recvLoop()
+    }
+
+    async disconnect() {
+        this._connected = false
+        await this.socket.close()
+    }
+
+    async send(data) {
+        if (!this._connected) {
+            throw new Error('Not connected')
+        }
+        await this._sendArray.push(data)
+    }
+
+    async recv() {
+        while (this._connected) {
+            const result = await this._recvArray.pop()
+            // null = sentinel value = keep trying
+            if (result) {
+                return result
+            }
+        }
+        throw new Error('Not connected')
+    }
+
+    async _sendLoop() {
+        // TODO handle errors
+        try {
+            while (this._connected) {
+                const data = await this._sendArray.pop()
+                await this._send(data)
+            }
+        } catch (e) {
+            console.log(e)
+            this._log.info('The server closed the connection while sending')
+        }
+    }
+
+    async _recvLoop() {
+        let data
+        while (this._connected) {
+            try {
+                data = await this._recv()
+                if (!data) {
+                    return
+                }
+            } catch (e) {
+                console.log(e)
+                this._log.info('an error occured')
+                return
+            }
+            await this._recvArray.push(data)
+        }
+    }
+
+    async _initConn() {
+        if (this._codec.tag) {
+            await this.socket.write(this._codec.tag)
+        }
+    }
+
+    async _send(data) {
+        const encodedPacket = this._codec.encodePacket(data)
+        this.socket.write(encodedPacket)
+    }
+
+    async _recv() {
+        return await this._codec.readPacket(this.socket)
+    }
+
+    toString() {
+        return `${this._ip}:${this._port}/${this.constructor.name.replace('Connection', '')}`
+    }
+}
+
+class ObfuscatedConnection extends Connection {
+    ObfuscatedIO = null
+
+    async _initConn() {
+        this._obfuscation = new this.ObfuscatedIO(this)
+        this.socket.write(this._obfuscation.header)
+    }
+
+    _send(data) {
+        this._obfuscation.write(this._codec.encodePacket(data))
+    }
+
+
+    async _recv() {
+        return await this._codec.readPacket(this._obfuscation)
+    }
+}
+
+class PacketCodec {
+    constructor(connection) {
+        this._conn = connection
+    }
+
+    encodePacket(data) {
+        throw new Error('Not Implemented')
+
+        // Override
+    }
+
+    async readPacket(reader) {
+        // override
+        throw new Error('Not Implemented')
+    }
+}
+
+module.exports = {
+    Connection,
+    PacketCodec,
+    ObfuscatedConnection,
+}

+ 2 - 1
src/lib/gramjs/network/connection/TCPAbridged.js

@@ -28,7 +28,8 @@ class AbridgedPacketCodec extends PacketCodec {
         const readData = await reader.read(1)
         let length = readData[0]
         if (length >= 127) {
-            length = Buffer.concat([await reader.read(3), Buffer.alloc(1)]).readInt32LE(0)
+            length = Buffer.concat([await reader.read(3), Buffer.alloc(1)])
+                .readInt32LE(0)
         }
 
         return await reader.read(length << 2)

+ 1 - 1
src/lib/gramjs/network/connection/TCPObfuscated.js

@@ -58,7 +58,7 @@ class ObfuscatedIO {
     }
 
     async read(n) {
-        const data = await this.connection.read(n)
+        const data = await this.connection.readExactly(n)
         return this._decrypt.encrypt(data)
     }
 

+ 10 - 6
src/lib/gramjs/tl/gramJsApi.js

@@ -11,6 +11,10 @@ const {
     serializeBytes,
     serializeDate
 } = require('./generationHelpers')
+<<<<<<< HEAD:src/lib/gramjs/tl/gramJsApi.js
+=======
+const { readBufferFromBigInt,toSignedLittleBuffer } = require('../Helpers')
+>>>>>>> 1a0b5c54... GramJS: Fix images loading: add `async-mutex`; Fix signed LE ints; Bring back `readExactly`:src/lib/gramjs/tl/api.js
 
 const NAMED_AUTO_CASTS = new Set([
     'chatId,int'
@@ -65,11 +69,11 @@ function argToBytes(x, type) {
             const i = Buffer.alloc(4)
             return i.writeInt32LE(x, 0)
         case 'long':
-            return readBufferFromBigInt(x, 8, true, true)
+            return toSignedLittleBuffer(x, 8)
         case 'int128':
-            return readBufferFromBigInt(x, 16, true, true)
+            return toSignedLittleBuffer(x, 16)
         case 'int256':
-            return readBufferFromBigInt(x, 32, true, true)
+            return toSignedLittleBuffer(x, 32)
         case 'double':
             const d = Buffer.alloc(8)
             return d.writeDoubleLE(x, 0)
@@ -277,13 +281,13 @@ function createClasses(classesType, params) {
                                     buffers.push(i)
                                     break
                                 case 'long':
-                                    buffers.push(readBufferFromBigInt(this[arg], 8, true, true))
+                                    buffers.push(toSignedLittleBuffer(this[arg], 8))
                                     break
                                 case 'int128':
-                                    buffers.push(readBufferFromBigInt(this[arg], 16, true, true))
+                                    buffers.push(toSignedLittleBuffer(this[arg], 16))
                                     break
                                 case 'int256':
-                                    buffers.push(readBufferFromBigInt(this[arg], 32, true, true))
+                                    buffers.push(toSignedLittleBuffer(this[arg], 32))
                                     break
                                 case 'double':
                                     const d = Buffer.alloc(8)