瀏覽代碼

Refactor networking
Add core

painor 5 年之前
父節點
當前提交
acc9ad279f

+ 15 - 2
gramjs/errors/RPCBaseErrors.js

@@ -8,13 +8,13 @@ class RPCError extends Error {
         super("RPCError {0}: {1}{2}"
             .replace("{0}", code)
             .replace("{1}", message)
-            .replace("{2}", RPCError._fmt_request(request)));
+            .replace("{2}", RPCError._fmtRequest(request)));
         this.code = code;
         this.message = message;
     }
 
     static _fmtRequest(request) {
-        return ' (caused by {})'.format(request.constructor.name)
+        return ` (caused by ${request.constructor.name})`
     }
 }
 
@@ -103,3 +103,16 @@ class TimedOutError extends RPCError {
     code = 503;  // Only witnessed as -503
     message = 'Timeout';
 }
+
+module.exports = {
+    RPCError,
+    InvalidDCError,
+    BadRequestError,
+    UnauthorizedError,
+    ForbiddenError,
+    NotFoundError,
+    AuthKeyError,
+    FloodError,
+    ServerError,
+    TimedOutError
+}

+ 12 - 0
gramjs/errors/index.js

@@ -0,0 +1,12 @@
+/**
+ * Converts a Telegram's RPC Error to a Python error.
+ * @param rpcError the RPCError instance
+ * @param request the request that caused this error
+ * @constructor the RPCError as a Python exception that represents this error
+ */
+const {rpcErrorObjects} = require("./rpcerrorlist");
+
+function RPCMessageToError(rpcError, request) {
+    //Try to get the error by direct look-up, otherwise regex
+    let cls = rpcErrorObjects[rpcError.errorMessage]
+}

+ 77 - 0
gramjs/extensions/MessagePacker.js

@@ -0,0 +1,77 @@
+const Helpers = require("../utils/Helpers");
+const {TLRequest} = require("../tl/tlobject");
+
+class MessagePacker {
+    constructor(state,logger) {
+        this._state = state;
+        this._queue = [];
+        this._ready = false;
+    }
+
+    append(state) {
+        this._queue.push(state);
+        this._ready = true;
+    }
+
+    extend(states) {
+        for (let state of states) {
+            this._queue.push(state);
+        }
+        this._ready = true;
+    }
+
+    async get() {
+        if (!this._queue.length) {
+            this._ready = false;
+            while (!this._ready) {
+                await Helpers.sleep(100);
+            }
+        }
+        let data;
+        let buffer = [];
+
+        let batch = [];
+        let size = 0;
+
+        while (this._queue.length && batch.length <= 100) {
+            let state = this._queue.shift();
+            size += state.length + 12;
+            if (size <= 1044448 - 8) {
+                state.msgId = this._state.writeDataAsMessage(
+                    buffer, state.data, state.request instanceof TLRequest,
+                    state.after.msgId
+                )
+                batch.push(state);
+                //log
+                continue;
+            }
+            if (batch.length) {
+                this._queue.unshift(state);
+                break;
+            }
+
+            size = 0;
+            continue
+        }
+        if (!batch.length) {
+            return null;
+        }
+        if (batch.length > 1) {
+            data = Buffer.concat([struct.pack(
+                '<Ii', 0x73f1f8dc, batch.length
+            ), buffer[0]]);
+            buffer = [];
+            let containerId = this._state.writeDataAsMessage(
+                buffer, data, false
+            );
+            for (let s of batch) {
+                s.containerId = containerId;
+            }
+        }
+
+        data = buffer[0];
+        return {batch, data}
+    }
+}
+
+module.exports = MessagePacker;

+ 6 - 2
gramjs/network/Authenticator.js

@@ -29,8 +29,12 @@ const {ReqPqMultiRequest} = require("../tl/functions");
 async function doAuthentication(sender) {
 
     // Step 1 sending: PQ Request, endianness doesn't matter since it's random
-    let nonce = Helpers.generateRandomBytes(16);
-    let resPQ = await sender.send(ReqPqMultiRequest(nonce));
+    let nonce = BigIntBuffer.toBigIntLE(Helpers.generateRandomBytes(16));
+    console.log(nonce);
+    process.exit(0);
+    await sender.send(new ReqPqMultiRequest({nonce: nonce}));
+
+    let resPQ = await sender.receive();
     if (!(resPQ instanceof ResPQ)) {
         throw new Error(`Step 1 answer was ${resPQ}`)
     }

+ 57 - 66
gramjs/network/MTProtoSender.js

@@ -1,13 +1,31 @@
 const MtProtoPlainSender = require("./MTProtoPlainSender");
+const MTProtoState = require("./MTProtoState");
 const Helpers = require("../utils/Helpers");
-const {MsgsAck} = require("../gramjs/tl/types");
+const {MsgsAck} = require("../tl/types");
+const AuthKey = require("../crypto/AuthKey");
 const AES = require("../crypto/AES");
-const {RPCError} = require("../errors");
+const {RPCError} = require("../errors/RPCBaseErrors");
+const RPCResult = require("../tl/core/RPCResult");
+const MessageContainer = require("../tl/core/MessageContainer");
+const GZIPPacked = require("../tl/core/GZIPPacked");
+const TLMessage = require("../tl/core/TLMessage");
+
 const format = require('string-format');
 const {TypeNotFoundError} = require("../errors");
 const {BadMessageError} = require("../errors");
 const {InvalidDCError} = require("../errors");
 const {gzip, ungzip} = require('node-gzip');
+const MessagePacker = require("../extensions/MessagePacker");
+const Pong = require("../tl/core/GZIPPacked");
+const BadServerSalt = require("../tl/core/GZIPPacked");
+const BadMsgNotification = require("../tl/core/GZIPPacked");
+const MsgDetailedInfo = require("../tl/core/GZIPPacked");
+const MsgNewDetailedInfo = require("../tl/core/GZIPPacked");
+const NewSessionCreated = require("../tl/core/GZIPPacked");
+const FutureSalts = require("../tl/core/GZIPPacked");
+const MsgsStateReq = require("../tl/core/GZIPPacked");
+const MsgResendReq = require("../tl/core/GZIPPacked");
+const MsgsAllInfo = require("../tl/core/GZIPPacked");
 //const {tlobjects} = require("../gramjs/tl/alltlobjects");
 format.extend(String.prototype, {});
 
@@ -42,7 +60,7 @@ class MTProtoSender {
     }) {
         this._connection = null;
         this._loggers = opt.loggers;
-        this._log = opt.loggers[__name__];
+        this._log = opt.loggers;
         this._retries = opt.retries;
         this._delay = opt.delay;
         this._autoReconnect = opt.autoReconnect;
@@ -72,8 +90,8 @@ class MTProtoSender {
         /**
          * Preserving the references of the AuthKey and state is important
          */
-        this.auth_key = authKey || new AuthKey(null);
-        this._state = new MTProtoState(this.auth_key, this._loggers);
+        this.authKey = authKey || new AuthKey(null);
+        this._state = new MTProtoState(this.authKey, this._loggers);
 
         /**
          * Outgoing messages are put in a queue and sent in a batch.
@@ -104,20 +122,20 @@ class MTProtoSender {
          */
 
         this._handlers = {
-            [RPCResult.CONSTRUCTOR_ID]: this._handle_rpc_result,
-            [MessageContainer.CONSTRUCTOR_ID]: this._handle_container,
-            [GzipPacked.CONSTRUCTOR_ID]: this._handle_gzip_packed,
-            [Pong.CONSTRUCTOR_ID]: this._handle_pong,
-            [BadServerSalt.CONSTRUCTOR_ID]: this._handle_bad_server_salt,
-            [BadMsgNotification.CONSTRUCTOR_ID]: this._handle_bad_notification,
-            [MsgDetailedInfo.CONSTRUCTOR_ID]: this._handle_detailed_info,
-            [MsgNewDetailedInfo.CONSTRUCTOR_ID]: this._handle_new_detailed_info,
-            [NewSessionCreated.CONSTRUCTOR_ID]: this._handle_new_session_created,
-            [MsgsAck.CONSTRUCTOR_ID]: this._handle_ack,
-            [FutureSalts.CONSTRUCTOR_ID]: this._handle_future_salts,
-            [MsgsStateReq.CONSTRUCTOR_ID]: this._handle_state_forgotten,
-            [MsgResendReq.CONSTRUCTOR_ID]: this._handle_state_forgotten,
-            [MsgsAllInfo.CONSTRUCTOR_ID]: this._handle_msg_all,
+            [RPCResult.CONSTRUCTOR_ID]: this._handleRPCResult,
+            [MessageContainer.CONSTRUCTOR_ID]: this._handleContainer,
+            [GZIPPacked.CONSTRUCTOR_ID]: this._handleGzipPacked,
+            [Pong.CONSTRUCTOR_ID]: this._handlePong,
+            [BadServerSalt.CONSTRUCTOR_ID]: this._handleBadServerSalt,
+            [BadMsgNotification.CONSTRUCTOR_ID]: this._handleBadNotification,
+            [MsgDetailedInfo.CONSTRUCTOR_ID]: this._handleDetailedInfo,
+            [MsgNewDetailedInfo.CONSTRUCTOR_ID]: this._handleNewDetailedInfo,
+            [NewSessionCreated.CONSTRUCTOR_ID]: this._handleNewSessionCreated,
+            [MsgsAck.CONSTRUCTOR_ID]: this._handleAck,
+            [FutureSalts.CONSTRUCTOR_ID]: this._handleFutureSalts,
+            [MsgsStateReq.CONSTRUCTOR_ID]: this._handleStateForgotten,
+            [MsgResendReq.CONSTRUCTOR_ID]: this._handleStateForgotten,
+            [MsgsAllInfo.CONSTRUCTOR_ID]: this._handleMsgAll,
         }
 
 
@@ -135,8 +153,10 @@ class MTProtoSender {
             this._log.info('User is already connected!');
             return false;
         }
+        console.log("connecting sender");
         this._connection = connection;
         await this._connect();
+        console.log("finished connecting sender");
         this._user_connected = true;
         return true;
     }
@@ -200,14 +220,14 @@ class MTProtoSender {
      * @private
      */
     async _connect() {
-        this._log.info('Connecting to {0}...'.replace("{0}", this._connection));
+        //this._log.info('Connecting to {0}...'.replace("{0}", this._connection));
         await this._connection.connect();
 
-        this._log.debug("Connection success!");
-        if (!this.auth_key) {
+        //this._log.debug("Connection success!");
+        if (!this.authKey) {
             let plain = new MtProtoPlainSender(this._connection, this._loggers);
             let res = await authenticator.do_authentication(plain);
-            this.auth_key.key = res.key;
+            this.authKey.key = res.key;
             this._state.time_offset = res.timeOffset;
 
             /**
@@ -217,22 +237,22 @@ class MTProtoSender {
              * switch to different data centers.
              */
             if (this._authKeyCallback) {
-                this._authKeyCallback(this.auth_key)
+                this._authKeyCallback(this.authKey)
 
             }
 
         }
-        this._log.debug('Starting send loop');
+        //this._log.debug('Starting send loop');
         this._send_loop_handle = this._send_loop();
 
-        this._log.debug('Starting receive loop');
+        //this._log.debug('Starting receive loop');
         this._recv_loop_handle = this._recv_loop();
 
         // _disconnected only completes after manual disconnection
         // or errors after which the sender cannot continue such
         // as failing to reconnect or any unexpected error.
 
-        this._log.info('Connection to %s complete!', this._connection)
+        //this._log.info('Connection to %s complete!', this._connection)
 
     }
 
@@ -292,11 +312,11 @@ class MTProtoSender {
         let body, message;
 
         while (this._user_connected && !this._reconnecting) {
-            this._log.debug('Receiving items from the network...');
+            //this._log.debug('Receiving items from the network...');
             try {
                 body = await this._connection.recv();
             } catch (e) {
-                this._log.info('Connection closed while receiving data');
+                //this._log.info('Connection closed while receiving data');
                 return
             }
             try {
@@ -306,21 +326,21 @@ class MTProtoSender {
 
                 if (e instanceof TypeNotFoundError) {
                     // Received object which we don't know how to deserialize
-                    this._log.info(`Type ${e.invalidConstructorId} not found, remaining data ${e.remaining}`);
+                    //this._log.info(`Type ${e.invalidConstructorId} not found, remaining data ${e.remaining}`);
                 } else if (e instanceof SecurityError) {
                     // A step while decoding had the incorrect data. This message
                     // should not be considered safe and it should be ignored.
-                    this._log.warning(`Security error while unpacking a received message: ${e}`);
+                    //this._log.warning(`Security error while unpacking a received message: ${e}`);
                     continue
                 } else if (e instanceof InvalidBufferError) {
-                    this._log.info('Broken authorization key; resetting');
-                    this.auth_key.key = null;
+                    //this._log.info('Broken authorization key; resetting');
+                    this.authKey.key = null;
                     if (this._authKeyCallback) {
                         this._authKeyCallback(null)
                     }
                     return
                 } else {
-                    this._log.exception('Unhandled error while receiving data');
+                    //this._log.exception('Unhandled error while receiving data');
                     return
                 }
             }
@@ -328,8 +348,8 @@ class MTProtoSender {
         try {
             await this._processMessage(message)
         } catch (e) {
-            this._log.exception('Unhandled error while receiving data');
-
+            //this._log.exception('Unhandled error while receiving data');
+            console.log(e);
         }
 
     }
@@ -658,7 +678,7 @@ class MTProtoSender {
      * @returns {Promise<void>}
      * @private
      */
-    async _handleMsgAll(message){
+    async _handleMsgAll(message) {
 
     }
 
@@ -706,35 +726,6 @@ class MTProtoSender {
         }
     }
 
-    /**
-     * Sends the specified MTProtoRequest, previously sending any message
-     * which needed confirmation. This also pauses the updates thread
-     * @param request {MTProtoRequest}
-     * @param resend
-     */
-    async send(request, resend = false) {
-        let buffer;
-        //If any message needs confirmation send an AckRequest first
-        if (this.needConfirmation.length) {
-            let msgsAck = new MsgsAck(
-                {
-                    msgIds:
-                    this.needConfirmation
-                });
-
-            buffer = msgsAck.onSend();
-            await this.sendPacket(buffer, msgsAck);
-            this.needConfirmation.length = 0;
-        }
-        //Finally send our packed request
-
-        buffer = request.onSend();
-        await this.sendPacket(buffer, request);
-
-        //And update the saved session
-        this.session.save();
-
-    }
 
     /**
      *

+ 1 - 1
gramjs/network/RequestState.js

@@ -5,7 +5,7 @@ class RequestState {
         this.containerId = null;
         this.msgId = null;
         this.request = request;
-        this.data = request.toBuffer();
+        this.data = request.bytes;
         this.after = after
 
     }

+ 0 - 74
gramjs/network/TCPClient.js

@@ -1,74 +0,0 @@
-const Socket = require("net").Socket;
-const sleep = require("../utils/Helpers").sleep;
-
-class TCPClient {
-    constructor() {
-        this.connected = false;
-        this.socket = new Socket();
-        this.canceled = false;
-        this.delay = 100;
-    }
-
-    /**
-     * Connects to the specified IP and port number
-     * @param ip
-     * @param port
-     */
-    async connect(ip, port) {
-        this.socket.connect({host: ip, port: port});
-        this.connected = true;
-
-    }
-
-    /**
-     * Closes the connection
-     */
-    async close() {
-        this.socket.destroy();
-        this.connected = true;
-    }
-
-    /**
-     * Writes (sends) the specified bytes to the connected peer
-     * @param data
-     */
-    async write(data) {
-        this.socket.write(data);
-    }
-
-    /**
-     * Reads (receives) the specified bytes from the connected peer
-     * @param bufferSize
-     * @returns {Buffer}
-     */
-    async read(bufferSize) {
-        this.canceled = false;
-        let buffer = Buffer.alloc(0);
-
-        let writtenCount = 0;
-        while (writtenCount < bufferSize) {
-            let leftCount = bufferSize - writtenCount;
-            let partial = this.socket.read(leftCount);
-            if (partial == null) {
-                console.log("sleeping");
-                await sleep(this.delay);
-                continue;
-            }
-            buffer = Buffer.concat([buffer, partial]);
-            writtenCount += buffer.byteLength;
-        }
-
-        return buffer;
-    }
-
-    /**
-     * Cancels the read operation IF it hasn't yet
-     * started, raising a ReadCancelledError
-     */
-    cancelRead() {
-        this.canceled = true;
-    }
-
-}
-
-module.exports = TCPClient;

+ 0 - 88
gramjs/network/TCPTransport.js

@@ -1,88 +0,0 @@
-const TcpClient = require("./TCPClient");
-const crc = require('crc');
-
-class TCPTransport {
-    constructor(ipAddress, port) {
-        this.tcpClient = new TcpClient();
-        this.sendCounter = 0;
-        this.ipAddress = ipAddress;
-        this.port = port;
-
-    }
-
-    async connect() {
-
-
-        await this.tcpClient.connect(this.ipAddress, this.port);
-    }
-
-    /**
-     * Sends the given packet (bytes array) to the connected peer
-     * Original reference: https://core.telegram.org/mtproto#tcp-transport
-     * The packets are encoded as: total length, sequence number, packet and checksum (CRC32)
-     * @param packet
-     */
-    async send(packet) {
-        if (!this.tcpClient.connected) {
-            throw Error("Client not connected to server.");
-        }
-        let buffer = Buffer.alloc(4 + 4);
-        buffer.writeInt32LE(packet.length + 12, 0);
-        buffer.writeInt32LE(this.sendCounter, 4);
-        buffer = Buffer.concat([buffer, packet]);
-        let tempBuffer = Buffer.alloc(4);
-        tempBuffer.writeUInt32LE(crc.crc32(buffer), 0);
-        buffer = Buffer.concat([buffer, tempBuffer]);
-        await this.tcpClient.write(buffer);
-        this.sendCounter++;
-    }
-
-    /**
-     * Receives a TCP message (tuple(sequence number, body)) from the connected peer
-     * @returns {{body: Buffer, seq: number}}
-     */
-    async receive() {
-        /**First read everything we need**/
-        let packetLengthBytes = await this.tcpClient.read(4);
-        let packetLength = packetLengthBytes.readInt32LE(0);
-        let seqBytes = await this.tcpClient.read(4);
-        let seq = seqBytes.readInt32LE(0);
-        let body = await this.tcpClient.read(packetLength - 12);
-        let checksum = (await this.tcpClient.read(4)).readUInt32LE(0);
-        /**Then perform the checks**/
-        let rv = Buffer.concat([packetLengthBytes, seqBytes, body]);
-        let validChecksum = crc.crc32(rv);
-        if (checksum !== validChecksum) {
-            throw Error("invalid checksum");
-        }
-        /** If we passed the tests, we can then return a valid TCP message**/
-
-        return {seq, body}
-    }
-
-    async close() {
-        if (this.tcpClient.connected) {
-            await this.tcpClient.close();
-        }
-    }
-
-    /**
-     * Cancels (stops) trying to receive from the
-     * remote peer and raises an {Error}
-     */
-    cancelReceive() {
-        this.tcpClient.cancelRead();
-    }
-
-    /**
-     * Gets the client read delay
-     * @returns {number}
-     */
-    getClientDelay() {
-        return this.tcpClient.delay;
-    }
-
-
-}
-
-module.exports = TCPTransport;

+ 149 - 0
gramjs/network/connection/Connection.js

@@ -0,0 +1,149 @@
+const Socket = require("net").Socket;
+const Helpers = require("../../utils/Helpers");
+
+/**
+ * The `Connection` class is a wrapper around ``asyncio.open_connection``.
+ *
+ * Subclasses will implement different transport modes as atomic operations,
+ * which this class eases doing since the exposed interface simply puts and
+ * gets complete data payloads to and from queues.
+ *
+ * The only error that will raise from send and receive methods is
+ * ``ConnectionError``, which will raise when attempting to send if
+ * the client is disconnected (includes remote disconnections).
+ */
+class Connection {
+    packetCodec = null;
+
+    constructor(ip, port, dcId, loggers) {
+        this._ip = ip;
+        this._port = port;
+        this._dcId = dcId;
+        this._log = loggers;
+        this._reader = null;
+        this._writer = null;
+        this._connected = false;
+        this._sendTask = null;
+        this._recvTask = null;
+        this._codec = null;
+        this._obfuscation = null;  // TcpObfuscated and MTProxy
+        this._sendArray = [];
+        this._recvArray = [];
+        this.socket = new Socket();
+
+    }
+
+    async _connect() {
+        console.log("trying to connect sock");
+        console.log("ip is ", this._ip);
+        console.log("port is ", this._port);
+        await this.socket.connect({host: this._ip, port: this._port});
+        this._codec = new this.packetCodec(this);
+        this._initConn();
+        console.log("finished init");
+    }
+
+    async connect() {
+        console.log("TCP connecting");
+        await this._connect();
+        this._connected = true;
+        console.log("finished first connect");
+        this._sendTask = this._sendLoop();
+        this._recvTask = this._recvLoop();
+        console.log("finsihed TCP connecting");
+    }
+
+    async disconnect() {
+        this._connected = false;
+        this.socket.close();
+    }
+
+    async send(data) {
+        console.log(this._sendArray);
+        if (!this._connected) {
+            throw new Error("Not connected");
+        }
+        while (this._sendArray.length !== 0) {
+            await Helpers.sleep(100);
+        }
+        console.log("will send",data);
+        this._sendArray.push(data);
+    }
+
+    async recv() {
+        while (this._connected) {
+            while (this._recvArray.length === 0) {
+                await Helpers.sleep(100);
+            }
+            let result = await this._recvArray.pop();
+
+            if (result) { // null = sentinel value = keep trying
+                return result
+            }
+        }
+        throw new Error("Not connected");
+    }
+
+    async _sendLoop() {
+        // TODO handle errors
+        while (this._connected) {
+            while (this._sendArray.length === 0) {
+                console.log("sleeping");
+                await Helpers.sleep(1000);
+            }
+            await this._send(this._sendArray.pop());
+
+        }
+    }
+
+    async _recvLoop() {
+        while (this._connected) {
+
+            while (this._recvArray.length === 0) {
+                await Helpers.sleep(1000);
+            }
+            let data = await this._recv();
+
+            this._recvArray.push(data);
+        }
+    }
+
+    async _initConn() {
+        if (this._codec.tag) {
+            console.log("writing codec");
+            this.socket.write(this._codec.tag);
+        }
+    }
+
+    async _send(data) {
+        console.log("sending ", data);
+        await this.socket.write(this._codec.encodePacket(data));
+    }
+
+    async _recv() {
+        console.log("receiving");
+        return await this._codec.readPacket(this.socket);
+    }
+}
+
+class PacketCodec {
+    constructor(connection) {
+        this._conn = connection;
+    }
+
+    encodePacket(data) {
+        throw new Error("Not Implemented")
+
+        //Override
+    }
+
+    async readPacket(reader) {
+        //override
+        throw new Error("Not Implemented")
+    }
+}
+
+module.exports = {
+    Connection,
+    PacketCodec
+};

+ 54 - 0
gramjs/network/connection/TCPFull.js

@@ -0,0 +1,54 @@
+const {Connection, PacketCodec} = require("./Connection");
+const struct = require("python-struct");
+const {crc32} = require("crc");
+const {InvalidChecksumError} = require("../../errors/Common");
+const Socket = require("net").Socket;
+
+class FullPacketCodec extends PacketCodec {
+    constructor(connection) {
+        super(connection);
+        this._sendCounter = 0; // Telegram will ignore us otherwise
+    }
+
+    encodePacket(data) {
+        // https://core.telegram.org/mtproto#tcp-transport
+        // total length, sequence number, packet and checksum (CRC32)
+        let length = data.length + 12;
+        data = struct.pack('<ii', length, this._sendCounter) + data;
+        let crc = struct.pack('<I', crc32(data));
+        this._sendCounter += 1;
+        return data + crc;
+    }
+
+    /**
+     *
+     * @param reader {Socket}
+     * @returns {Promise<*>}
+     */
+    async readPacket(reader)
+    {
+        let packetLenSeq = await reader.read(8); // 4 and 4
+
+        console.log("packet length", packetLenSeq);
+        let res = struct.unpack("<ii", packetLenSeq);
+        let packetLen = res[0];
+        let seq = res[1];
+        let body = await reader.read(packetLen - 8);
+        let checksum = struct.unpack("<I", body.slice(-4))[0];
+
+        let validChecksum = crc32(packetLen + body);
+        if (!(validChecksum.equals(checksum))) {
+            throw new InvalidChecksumError(checksum, validChecksum);
+        }
+        return body;
+    }
+}
+
+class ConnectionTCPFull extends Connection {
+    packetCodec = FullPacketCodec;
+}
+
+module.exports = {
+    FullPacketCodec,
+    ConnectionTCPFull
+};

+ 0 - 0
gramjs/network/connection/index.js


+ 0 - 0
gramjs/network/index.js


+ 37 - 49
gramjs/tl/TelegramClient.js

@@ -2,75 +2,63 @@ const Session = require("./Session");
 const doAuthentication = require("../network/Authenticator");
 const MtProtoSender = require("../network/mtprotoSender");
 const MTProtoRequest = require("../tl/MTProtoRequest");
-const TcpTransport = require("../network/TCPTransport");
-
-const {InvokeWithLayerRequest, InitConnectionRequest} = require("../gramjs/tl/functions/index");
-const {GetConfigRequest} = require("../gramjs/tl/functions/help");
+const {ConnectionTCPFull} = require("../network/connection/TCPFull");
+const {InvokeWithLayerRequest, InitConnectionRequest} = require("./functions/index");
+const {GetConfigRequest} = require("./functions/help");
+const {LAYER} = require("../tl/alltlobjects");
 
 class TelegramClient {
 
-    constructor(sessionUserId, layer, apiId, apiHash) {
+    constructor(sessionUserId, apiId, apiHash, connection = ConnectionTCPFull) {
         if (apiId === undefined || apiHash === undefined) {
             throw Error("Your API ID or Hash are invalid. Please read \"Requirements\" on README.md");
         }
+
         this.apiId = apiId;
         this.apiHash = apiHash;
-
-        this.layer = layer;
-
+        this._connection = ConnectionTCPFull;
+        this._initWith = (x) => {
+            return new InvokeWithLayerRequest({
+                layer: LAYER,
+                query: new InitConnectionRequest({
+                    apiId: this.apiId,
+                    deviceModel: "Windows",
+                    systemVersion: "1.8.3",
+                    appVersion: "1.8",
+                    langCode: "en",
+                    systemLangCode: "en",
+                    query: x,
+                    proxy: null,
+                })
+            })
+        };
         this.session = Session.tryLoadOrCreateNew(sessionUserId);
-        this.transport = new TcpTransport(this.session.serverAddress, this.session.port);
         //These will be set later
         this.dcOptions = null;
-        this.sender = null;
+        this._sender = new MtProtoSender(this.session.authKey);
         this.phoneCodeHashes = Array();
 
     }
 
+
     /**
      * Connects to the Telegram servers, executing authentication if required.
      * Note that authenticating to the Telegram servers is not the same as authenticating
      * the app, which requires to send a code first.
-     * @param reconnect {Boolean}
-     * @returns {Promise<Boolean>}
+     * @returns {Promise<void>}
      */
-    async connect(reconnect = false) {
-        await this.transport.connect();
-        try {
-            if (!this.session.authKey || reconnect) {
-                let res = await doAuthentication(this.transport);
-                console.log("authenticated");
-                this.session.authKey = res.authKey;
-                this.session.timeOffset = res.timeOffset;
-                this.session.save();
-            }
-            this.sender = new MtProtoSender(this.transport, this.session);
-            let r = await this.invoke(new GetConfigRequest());
-            console.log(r);
-            process.exit(0)
-            // Now it's time to send an InitConnectionRequest
-            // This must always be invoked with the layer we'll be using
-            let query = new InitConnectionRequest({
-                apiId: this.apiId,
-                deviceModel: "PlaceHolder",
-                systemVersion: "PlaceHolder",
-                appVersion: "0.0.1",
-                langCode: "en",
-                query: new GetConfigRequest()
-            });
-            let result = await this.invoke(new InvokeWithLayerRequest({
-                layer: this.layer,
-                query: query
-            }));
-            // We're only interested in the DC options
-            // although many other options are available!
-            this.dcOptions = result.dcOptions;
-            return true;
-        } catch (error) {
-            console.log('Could not stabilise initial connection: {}'.replace("{}", error));
-            console.log(error.stack);
-            return false;
+    async connect() {
+        let connection = new this._connection(this.session.serverAddress, this.session.port, this.session.dcId, null);
+        if (!await this._sender.connect(connection)) {
+            return;
         }
+        console.log("ok");
+        this.session.authKey = this._sender.authKey;
+        await this.session.save();
+        await this._sender.send(this._initWith(
+            new GetConfigRequest()
+        ));
+
     }
 
     /**
@@ -94,7 +82,7 @@ class TelegramClient {
         this.session.server_address = dc.ipAddress;
         this.session.port = dc.port;
         this.session.save();
-        await this.connect(true);
+        await this.connect();
     }
 
     /**

+ 41 - 0
gramjs/tl/core/GZIPPacked.js

@@ -0,0 +1,41 @@
+const {TLObject} = require("../tlobject");
+const struct = require("python-struct");
+const {ungzip} = require("node-gzip");
+const {gzip} = require("node-gzip");
+
+class GZIPPacked extends TLObject {
+    static CONSTRUCTOR_ID = 0x3072cfa1;
+
+    constructor(data) {
+        super();
+        this.data = data;
+    }
+
+    async GZIPIfSmaller(contentRelated, data) {
+        if (contentRelated && data.length > 512) {
+            let gzipped = await (new GZIPPacked(data)).toBytes();
+        }
+        return data;
+    }
+
+    async toBytes() {
+        return Buffer.concat([
+            struct.pack("<I", GZIPPacked.CONSTRUCTOR_ID),
+            TLObject.serializeBytes(await gzip(this.data))
+        ])
+    }
+
+    static async read(reader) {
+        let constructor = reader.readInt(false);
+        if (constructor !== GZIPPacked.CONSTRUCTOR_ID) {
+            throw new Error("not equal");
+        }
+        return await gzip(reader.tgReadBytes());
+    }
+
+    static async fromReader(reader) {
+        return new GZIPPacked(await ungzip(reader.tgReadBytes()));
+    }
+
+}
+module.exports = GZIPPacked;

+ 44 - 0
gramjs/tl/core/MessageContainer.js

@@ -0,0 +1,44 @@
+const {TLObject} = require("../tlobject");
+const struct = require("python-struct");
+const {TLMessage} = require("./TLMessage");
+
+class MessageContainer extends TLObject {
+    static CONSTRUCTOR_ID = 0x73f1f8dc;
+
+    // Maximum size in bytes for the inner payload of the container.
+    // Telegram will close the connection if the payload is bigger.
+    // The overhead of the container itself is subtracted.
+    static MAXIMUM_SIZE = 1044456 - 8;
+
+    // Maximum amount of messages that can't be sent inside a single
+    // container, inclusive. Beyond this limit Telegram will respond
+    // with BAD_MESSAGE 64 (invalid container).
+    //
+    // This limit is not 100% accurate and may in some cases be higher.
+    // However, sending up to 100 requests at once in a single container
+    // is a reasonable conservative value, since it could also depend on
+    // other factors like size per request, but we cannot know this.
+    static MAXIMUM_LENGTH = 100;
+
+    constructor(messages) {
+        super();
+        this.messages = messages;
+    }
+
+    static async fromReader(reader) {
+        let messages = [];
+        for (let x of reader.readInt()) {
+            let msgId = reader.readInt();
+            let seqNo = reader.readInt();
+            let length = reader.readInt();
+            let before = reader.tellPosition();
+            let obj = reader.tgReadObject();
+            reader.setPosition(before + length);
+            messages.push(new TLMessage(msgId, seqNo, obj))
+        }
+        return new MessageContainer(messages);
+    }
+
+}
+
+module.exports = MessageContainer;

+ 34 - 0
gramjs/tl/core/RPCResult.js

@@ -0,0 +1,34 @@
+const {TLObject} = require("../tlobject");
+const {RpcError} = require("../types")
+const GZIPPacked = require("./GZIPPacked");
+
+class RPCResult extends TLObject {
+    static CONSTRUCTOR_ID = 0xf35c6d01;
+
+    constructor(reqMsgId, body, error) {
+        super();
+        this.reqMsgId = reqMsgId;
+        this.body = body;
+        this.error = error;
+    }
+
+    static async fromReader(reader) {
+        let msgId = reader.readInt();
+        let innerCode = reader.readInt(false);
+        if (innerCode === RpcError.CONSTRUCTOR_ID) {
+            return RPCResult(msgId, null, RpcError.fromReader(reader));
+        }
+        if (innerCode === GZIPPacked.CONSTRUCTOR_ID) {
+            return RPCResult(msgId, (await GZIPPacked.fromReader(reader)).data)
+        }
+        reader.seek(-4);
+        // This reader.read() will read more than necessary, but it's okay.
+        // We could make use of MessageContainer's length here, but since
+        // it's not necessary we don't need to care about it.
+        return new RPCResult(msgId, reader.read(), null)
+
+    }
+
+}
+
+module.exports = RPCResult;

+ 14 - 0
gramjs/tl/core/TLMessage.js

@@ -0,0 +1,14 @@
+const {TLObject} = require("../tlobject");
+
+class TLMessage extends TLObject {
+    static SIZE_OVERHEAD = 12;
+
+    constructor(msgId, seqNo, obj) {
+        super();
+        this.msgId = msgId;
+        this.seqNo = seqNo;
+        this.obj = obj;
+    }
+}
+
+module.exports = TLMessage;

+ 0 - 1
gramjs/utils/Helpers.js

@@ -299,4 +299,3 @@ class Helpers {
 
 module.exports = Helpers;
 
-console.log(Helpers.sha256("ok"));

+ 13 - 1
gramjs_generator/generators/errors.js

@@ -67,7 +67,7 @@ const generateErrors = (errors, f) => {
         f.write('    }\n}\n');
     }
 
-    f.write('\n\nconst rpcErrorsDict = {\n');
+    f.write('\n\nconst rpcErrorsObject = {\n');
 
     for (const error of exactMatch) {
         f.write(`    ${error.pattern}: ${error.name},\n`);
@@ -80,6 +80,18 @@ const generateErrors = (errors, f) => {
     }
 
     f.write('];');
+    f.write("module.exports = {");
+    for (const error of regexMatch) {
+        f.write(`     ${error.name},\n`);
+    }
+    for (const error of exactMatch) {
+        f.write(`     ${error.name},\n`);
+    }
+    f.write("     rpcErrorsObject,\n");
+    f.write("     rpcErrorRe,\n");
+
+    f.write("}");
+
 };
 
 module.exports = {

+ 2 - 1
gramjs_generator/generators/tlobject.js

@@ -518,6 +518,7 @@ const writeFromReader = (tlobject, builder) => {
 
     builder.writeln("static fromReader(reader) {");
     for (const arg of tlobject.args) {
+
         if (arg.name !== "flag") {
 
 
@@ -537,7 +538,7 @@ const writeFromReader = (tlobject, builder) => {
     }
     let temp = [];
     for (let a of tlobject.realArgs) {
-        temp.push(`${a.name}:_${a.name}`)
+        temp.push(`${variableSnakeToCamelCase(a.name)}:_${a.name}`)
     }
     builder.writeln("return this({%s})", temp.join(",\n\t"));
     builder.endBlock();

+ 6 - 4
main.js

@@ -1,9 +1,11 @@
-/**
+const Helpers = require("./gramjs/utils/Helpers");
+const TelegramClient = require("./gramjs/tl/TelegramClient");
 (async function () {
     console.log("Loading interactive example...");
-    let settings = await Helpers.loadSettings();
-    let client = new TelegramClient(settings["session_name"], 73, settings["api_id"], settings["api_hash"]);
+    let sessionName = "anon";
+    let apiId = 17349;
+    let apiHash = "344583e45741c457fe1862106095a5eb";
+    let client = new TelegramClient(sessionName, 73, apiId, apiHash);
     await client.connect();
     console.log("You should now be connected.");
 })();
-*/