Quellcode durchsuchen

Add semaphore downloads limit.

painor vor 3 Jahren
Ursprung
Commit
0fe8d155eb
2 geänderte Dateien mit 116 neuen und 101 gelöschten Zeilen
  1. 103 98
      gramjs/client/downloads.ts
  2. 13 3
      gramjs/client/telegramBaseClient.ts

+ 103 - 98
gramjs/client/downloads.ts

@@ -72,124 +72,129 @@ export async function downloadFile(
     inputLocation: Api.TypeInputFileLocation,
     fileParams: DownloadFileParams
 ) {
-    let { partSizeKb, end } = fileParams;
-    const { fileSize, workers = 1 } = fileParams;
-    const { dcId, progressCallback, start = 0 } = fileParams;
-    if (end && fileSize) {
-        end = end < fileSize ? end : fileSize - 1;
-    }
-
-    if (!partSizeKb) {
-        partSizeKb = fileSize
-            ? getAppropriatedPartSize(fileSize)
-            : DEFAULT_CHUNK_SIZE;
-    }
-
-    const partSize = partSizeKb * 1024;
-    const partsCount = end ? Math.ceil((end - start) / partSize) : 1;
-
-    if (partSize % MIN_CHUNK_SIZE !== 0) {
-        throw new Error(
-            `The part size must be evenly divisible by ${MIN_CHUNK_SIZE}`
-        );
-    }
+    const [value, release] = await client._semaphore.acquire();
+    try {
+        let { partSizeKb, end } = fileParams;
+        const { fileSize, workers = 1 } = fileParams;
+        const { dcId, progressCallback, start = 0 } = fileParams;
+        if (end && fileSize) {
+            end = end < fileSize ? end : fileSize - 1;
+        }
 
-    client._log.info(`Downloading file in chunks of ${partSize} bytes`);
+        if (!partSizeKb) {
+            partSizeKb = fileSize
+                ? getAppropriatedPartSize(fileSize)
+                : DEFAULT_CHUNK_SIZE;
+        }
 
-    const foreman = new Foreman(workers);
-    const promises: Promise<any>[] = [];
-    let offset = start;
-    // Used for files with unknown size and for manual cancellations
-    let hasEnded = false;
+        const partSize = partSizeKb * 1024;
+        const partsCount = end ? Math.ceil((end - start) / partSize) : 1;
 
-    let progress = 0;
-    if (progressCallback) {
-        progressCallback(progress);
-    }
+        if (partSize % MIN_CHUNK_SIZE !== 0) {
+            throw new Error(
+                `The part size must be evenly divisible by ${MIN_CHUNK_SIZE}`
+            );
+        }
 
-    // Preload sender
-    await client.getSender(dcId);
+        client._log.info(`Downloading file in chunks of ${partSize} bytes`);
 
-    // eslint-disable-next-line no-constant-condition
-    while (true) {
-        let limit = partSize;
-        let isPrecise = false;
+        const foreman = new Foreman(workers);
+        const promises: Promise<any>[] = [];
+        let offset = start;
+        // Used for files with unknown size and for manual cancellations
+        let hasEnded = false;
 
-        if (
-            Math.floor(offset / ONE_MB) !==
-            Math.floor((offset + limit - 1) / ONE_MB)
-        ) {
-            limit = ONE_MB - (offset % ONE_MB);
-            isPrecise = true;
+        let progress = 0;
+        if (progressCallback) {
+            progressCallback(progress);
         }
 
-        await foreman.requestWorker();
+        // Preload sender
+        await client.getSender(dcId);
+
+        // eslint-disable-next-line no-constant-condition
+        while (true) {
+            let limit = partSize;
+            let isPrecise = false;
+
+            if (
+                Math.floor(offset / ONE_MB) !==
+                Math.floor((offset + limit - 1) / ONE_MB)
+            ) {
+                limit = ONE_MB - (offset % ONE_MB);
+                isPrecise = true;
+            }
+
+            await foreman.requestWorker();
+
+            if (hasEnded) {
+                foreman.releaseWorker();
+                break;
+            }
+            // eslint-disable-next-line no-loop-func
+            promises.push(
+                (async (offsetMemo: number) => {
+                    // eslint-disable-next-line no-constant-condition
+                    while (true) {
+                        let sender;
+                        try {
+                            sender = await client.getSender(dcId);
+                            const result = await sender.send(
+                                new Api.upload.GetFile({
+                                    location: inputLocation,
+                                    offset: offsetMemo,
+                                    limit,
+                                    precise: isPrecise || undefined,
+                                })
+                            );
+
+                            if (progressCallback) {
+                                if (progressCallback.isCanceled) {
+                                    throw new Error("USER_CANCELED");
+                                }
+
+                                progress += 1 / partsCount;
+                                progressCallback(progress);
+                            }
 
-        if (hasEnded) {
-            foreman.releaseWorker();
-            break;
-        }
-        // eslint-disable-next-line no-loop-func
-        promises.push(
-            (async (offsetMemo: number) => {
-                // eslint-disable-next-line no-constant-condition
-                while (true) {
-                    let sender;
-                    try {
-                        sender = await client.getSender(dcId);
-                        const result = await sender.send(
-                            new Api.upload.GetFile({
-                                location: inputLocation,
-                                offset: offsetMemo,
-                                limit,
-                                precise: isPrecise || undefined,
-                            })
-                        );
-
-                        if (progressCallback) {
-                            if (progressCallback.isCanceled) {
-                                throw new Error("USER_CANCELED");
+                            if (!end && result.bytes.length < limit) {
+                                hasEnded = true;
                             }
 
-                            progress += 1 / partsCount;
-                            progressCallback(progress);
-                        }
+                            foreman.releaseWorker();
 
-                        if (!end && result.bytes.length < limit) {
-                            hasEnded = true;
-                        }
+                            return result.bytes;
+                        } catch (err) {
+                            if (sender && !sender.isConnected()) {
+                                await sleep(DISCONNECT_SLEEP);
+                                continue;
+                            } else if (err instanceof errors.FloodWaitError) {
+                                await sleep(err.seconds * 1000);
+                                continue;
+                            }
 
-                        foreman.releaseWorker();
+                            foreman.releaseWorker();
 
-                        return result.bytes;
-                    } catch (err) {
-                        if (sender && !sender.isConnected()) {
-                            await sleep(DISCONNECT_SLEEP);
-                            continue;
-                        } else if (err instanceof errors.FloodWaitError) {
-                            await sleep(err.seconds * 1000);
-                            continue;
+                            hasEnded = true;
+                            throw err;
                         }
-
-                        foreman.releaseWorker();
-
-                        hasEnded = true;
-                        throw err;
                     }
-                }
-            })(offset)
-        );
+                })(offset)
+            );
 
-        offset += limit;
+            offset += limit;
 
-        if (end && offset > end) {
-            break;
+            if (end && offset > end) {
+                break;
+            }
         }
+        const results = await Promise.all(promises);
+        const buffers = results.filter(Boolean);
+        const totalLength = end ? end + 1 - start : undefined;
+        return Buffer.concat(buffers, totalLength);
+    } finally {
+        release();
     }
-    const results = await Promise.all(promises);
-    const buffers = results.filter(Boolean);
-    const totalLength = end ? end + 1 - start : undefined;
-    return Buffer.concat(buffers, totalLength);
 }
 
 class Foreman {

+ 13 - 3
gramjs/client/telegramBaseClient.ts

@@ -21,6 +21,7 @@ import {
     ProxyInterface,
     TCPMTProxy,
 } from "../network/connection/TCPMTProxy";
+import { Semaphore } from "async-mutex";
 
 const EXPORTED_SENDER_RECONNECT_TIMEOUT = 1000; // 1 sec
 const EXPORTED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec
@@ -105,6 +106,10 @@ export interface TelegramClientParams {
      * Whether to try to connect over Wss (or 443 port) or not.
      */
     useWSS?: boolean;
+    /**
+     * Limits how many downloads happen at the same time.
+     */
+    maxConcurrentDownloads?: number;
 }
 
 const clientParamsDefault = {
@@ -197,7 +202,7 @@ export abstract class TelegramBaseClient {
     _reconnecting: boolean;
     _destroyed: boolean;
     protected _proxy?: ProxyInterface;
-
+    _semaphore: Semaphore;
     constructor(
         session: string | Session,
         apiId: number,
@@ -234,7 +239,9 @@ export abstract class TelegramBaseClient {
         this._timeout = clientParams.timeout!;
         this._autoReconnect = clientParams.autoReconnect!;
         this._proxy = clientParams.proxy;
-
+        this._semaphore = new Semaphore(
+            clientParams.maxConcurrentDownloads || 1
+        );
         if (!(clientParams.connection instanceof Function)) {
             throw new Error("Connection should be a class not an instance");
         }
@@ -289,7 +296,10 @@ export abstract class TelegramBaseClient {
     set floodSleepThreshold(value: number) {
         this._floodSleepThreshold = Math.min(value || 0, 24 * 60 * 60);
     }
-
+    set maxConcurrentDownloads(value:number) {
+        // @ts-ignore
+        this._semaphore._value = value;
+    }
     // region connecting
     async _initSession() {
         await this.session.load();