Bladeren bron

[Perf] GramJs: Add parallel uploads (#659)

painor 4 jaren geleden
bovenliggende
commit
efc2694ede
4 gewijzigde bestanden met toevoegingen van 276 en 3 verwijderingen
  1. 170 0
      src/api/gramjs/methods/client.ts
  2. 10 1
      src/api/gramjs/methods/media.ts
  3. 31 0
      src/config.ts
  4. 65 2
      src/lib/gramjs/client/uploadFile.ts

+ 170 - 0
src/api/gramjs/methods/client.ts

@@ -0,0 +1,170 @@
+import {
+  TelegramClient, sessions, Api as GramJs, connection,
+} from '../../../lib/gramjs';
+import { Logger as GramJsLogger } from '../../../lib/gramjs/extensions/index';
+
+import { ApiMediaFormat, ApiOnProgress } from '../../types';
+
+import { DEBUG, DEBUG_GRAMJS, UPLOAD_WORKERS } from '../../../config';
+import {
+  onRequestPhoneNumber, onRequestCode, onRequestPassword, onRequestRegistration,
+  onAuthError, onAuthReady, onCurrentUserUpdate, onRequestQrCode,
+} from './auth';
+import { setUpdaterCurrentUserId, updater, handleError } from '../updater';
+import downloadMediaWithClient from './media';
+import { buildApiUserFromFull } from '../apiBuilders/users';
+import localDb from '../localDb';
+
+GramJsLogger.setLevel(DEBUG_GRAMJS ? 'debug' : 'warn');
+
+const gramJsUpdateEventBuilder = { build: (update: object) => update };
+
+let client: TelegramClient;
+let isConnected = false;
+
+export async function init(sessionId: string) {
+  if (DEBUG) {
+    // eslint-disable-next-line no-console
+    console.log('>>> START INIT API 2');
+  }
+
+  const session = new sessions.CacheApiSession(sessionId);
+  client = new TelegramClient(
+    session,
+    process.env.TELEGRAM_T_API_ID,
+    process.env.TELEGRAM_T_API_HASH,
+    { useWSS: true } as any,
+  );
+
+  client.addEventHandler(onUpdate, gramJsUpdateEventBuilder);
+  client.addEventHandler(updater, gramJsUpdateEventBuilder);
+
+  try {
+    if (DEBUG) {
+      // eslint-disable-next-line no-console
+      console.log('[GramJs/client] CONNECTING');
+    }
+
+    await client.start({
+      phoneNumber: onRequestPhoneNumber,
+      phoneCode: onRequestCode,
+      password: onRequestPassword,
+      firstAndLastNames: onRequestRegistration,
+      qrCode: onRequestQrCode,
+      onError: onAuthError,
+    });
+
+    const newSessionId = await session.save();
+
+    if (DEBUG) {
+      // eslint-disable-next-line no-console
+      console.log('[GramJs/client] CONNECTED as ', newSessionId);
+    }
+
+    onAuthReady(newSessionId);
+    void fetchCurrentUser();
+  } catch (err) {
+    if (DEBUG) {
+      // eslint-disable-next-line no-console
+      console.log('[GramJs/client] CONNECTING ERROR', err);
+    }
+
+    throw err;
+  }
+}
+
+export async function destroy() {
+  await client.destroy();
+}
+
+function onUpdate(update: any) {
+  if (update instanceof connection.UpdateConnectionState) {
+    isConnected = update.state === connection.UpdateConnectionState.states.connected;
+  }
+}
+
+export async function invokeRequest<T extends GramJs.AnyRequest>(
+  request: T,
+  shouldHandleUpdates = false,
+): Promise<T['__response'] | undefined> {
+  if (!isConnected) {
+    if (DEBUG) {
+      // eslint-disable-next-line no-console
+      console.warn(`[GramJs/client] INVOKE ${request.className} ERROR: Client is not connected`);
+    }
+
+    return undefined;
+  }
+
+  try {
+    if (DEBUG) {
+      // eslint-disable-next-line no-console
+      console.log(`[GramJs/client] INVOKE ${request.className}`);
+    }
+
+    const result = await client.invoke(request);
+
+    if (DEBUG) {
+      // eslint-disable-next-line no-console
+      console.log(`[GramJs/client] INVOKE RESPONSE ${request.className}`, result);
+    }
+
+    if (shouldHandleUpdates) {
+      if (result instanceof GramJs.Updates || result instanceof GramJs.UpdatesCombined) {
+        result.updates.forEach((update) => updater(update, request));
+      } else if (result instanceof GramJs.UpdatesTooLong) {
+        // TODO Implement
+      } else {
+        updater(result as GramJs.TypeUpdates, request);
+      }
+    }
+
+    return result;
+  } catch (err) {
+    if (DEBUG) {
+      // eslint-disable-next-line no-console
+      console.log(`[GramJs/client] INVOKE ERROR ${request.className}`);
+      // eslint-disable-next-line no-console
+      console.error(err);
+    }
+
+    const isSlowMode = err.message.startsWith('A wait of') && (
+      request instanceof GramJs.messages.SendMessage
+      || request instanceof GramJs.messages.SendMedia
+      || request instanceof GramJs.messages.SendMultiMedia
+    );
+
+    handleError({
+      ...err,
+      isSlowMode,
+    });
+    return undefined;
+  }
+}
+
+export function downloadMedia(
+  args: { url: string; mediaFormat: ApiMediaFormat; start?: number; end?: number },
+  onProgress?: ApiOnProgress,
+) {
+  return downloadMediaWithClient(args, client, isConnected, onProgress);
+}
+
+export function uploadFile(file: File, onProgress?: ApiOnProgress) {
+  return client.uploadFile({ file, onProgress, workers: UPLOAD_WORKERS });
+}
+
+export async function fetchCurrentUser() {
+  const userFull = await invokeRequest(new GramJs.users.GetFullUser({
+    id: new GramJs.InputUserSelf(),
+  }));
+
+  if (!userFull || !(userFull.user instanceof GramJs.User)) {
+    return;
+  }
+
+  localDb.users[userFull.user.id] = userFull.user;
+  const currentUser = buildApiUserFromFull(userFull);
+
+  setUpdaterCurrentUserId(currentUser.id);
+  onCurrentUserUpdate(currentUser);
+}

+ 10 - 1
src/api/gramjs/methods/media.ts

@@ -5,7 +5,9 @@ import {
   ApiMediaFormat, ApiOnProgress, ApiParsedMedia, ApiPreparedMedia,
 } from '../../types';
 
-import { MEDIA_CACHE_DISABLED, MEDIA_CACHE_MAX_BYTES, MEDIA_CACHE_NAME } from '../../../config';
+import {
+  DOWNLOAD_WORKERS, MEDIA_CACHE_DISABLED, MEDIA_CACHE_MAX_BYTES, MEDIA_CACHE_NAME,
+} from '../../../config';
 import localDb from '../localDb';
 import { getEntityTypeById } from '../gramjsBuilders';
 import { blobToDataUri } from '../../../util/files';
@@ -16,9 +18,12 @@ type EntityType = 'msg' | 'sticker' | 'gif' | 'channel' | 'chat' | 'user' | 'sti
 =======
 type EntityType = 'msg' | 'sticker' | 'wallpaper' | 'gif' | 'channel' | 'chat' | 'user' | 'stickerSet';
 
+<<<<<<< HEAD
 const MAX_WORKERS = 16;
 >>>>>>> 90fbcf87... GramJS: Various fixes for download and floodwaits (#623)
 
+=======
+>>>>>>> edef29da... [Perf] GramJs: Add parallel uploads (#659)
 export default async function downloadMedia(
   {
     url, mediaFormat, start, end,
@@ -118,11 +123,15 @@ async function download(
 
   if (entityType === 'msg' || entityType === 'sticker' || entityType === 'gif') {
     const data = await client.downloadMedia(entity, {
+<<<<<<< HEAD
 <<<<<<< HEAD
       sizeType, start, end, progressCallback: onProgress, workers: 16,
 =======
       sizeType, start, end, progressCallback: onProgress, workers: MAX_WORKERS,
 >>>>>>> 90fbcf87... GramJS: Various fixes for download and floodwaits (#623)
+=======
+      sizeType, start, end, progressCallback: onProgress, workers: entityType === 'msg' ? DOWNLOAD_WORKERS : 1,
+>>>>>>> edef29da... [Perf] GramJs: Add parallel uploads (#659)
     });
     let mimeType;
     let fullSize;

+ 31 - 0
src/config.ts

@@ -5,6 +5,37 @@ export const MEDIA_CACHE_NAME = 'tt-media';
 
 export const GLOBAL_STATE_CACHE_DISABLED = false;
 export const GLOBAL_STATE_CACHE_KEY = 'tt-global-state';
+<<<<<<< HEAD
+=======
+export const GLOBAL_STATE_CACHE_CHAT_LIST_LIMIT = 20;
+
+export const MEDIA_CACHE_DISABLED = false;
+export const MEDIA_CACHE_NAME = 'tt-media';
+export const MEDIA_PROGRESSIVE_CACHE_DISABLED = false;
+export const MEDIA_PROGRESSIVE_CACHE_NAME = 'tt-media-progressive';
+export const MEDIA_CACHE_MAX_BYTES = 512000; // 512 KB
+export const CUSTOM_BG_CACHE_NAME = 'tt-custom-bg';
+
+export const DOWNLOAD_WORKERS = 16;
+export const UPLOAD_WORKERS = 16;
+
+const isBigScreen = typeof window !== 'undefined' && window.innerHeight >= 900;
+
+export const MESSAGE_LIST_SENSITIVE_AREA = 750;
+export const MESSAGE_LIST_SLICE = isBigScreen ? 50 : 40;
+export const MESSAGE_LIST_VIEWPORT_LIMIT = MESSAGE_LIST_SLICE * 3;
+
+export const CHAT_LIST_SLICE = 20;
+export const CHAT_LIST_LOAD_SLICE = 100;
+export const SHARED_MEDIA_SLICE = 30;
+export const MESSAGE_SEARCH_SLICE = 30;
+export const GLOBAL_SEARCH_SLICE = 20;
+
+export const TOP_CHATS_PRELOAD_LIMIT = 15;
+
+export const DEFAULT_ANIMATION_LEVEL = 2;
+export const DEFAULT_MESSAGE_TEXT_SIZE_PX = 16;
+>>>>>>> edef29da... [Perf] GramJs: Add parallel uploads (#659)
 
 export const GRAMJS_SESSION_ID_KEY = 'GramJs:sessionId';
 export const TDLIB_SESSION_ID_KEY = 'TdLib:sessionId';

+ 65 - 2
src/lib/gramjs/client/uploadFile.ts

@@ -13,6 +13,7 @@ interface OnProgress {
 
 export interface UploadFileParams {
     file: File;
+    workers: number;
     onProgress?: OnProgress;
 }
 
@@ -24,8 +25,9 @@ export async function uploadFile(
     fileParams: UploadFileParams,
 ): Promise<Api.InputFile | Api.InputFileBig> {
     const { file, onProgress } = fileParams;
-    const { name, size } = file;
+    let { workers } = fileParams;
 
+    const { name, size } = file;
     const fileId = readBigIntFromBuffer(generateRandomBytes(8), true, true);
     const isLarge = size > LARGE_FILE_THRESHOLD;
 
@@ -36,10 +38,19 @@ export async function uploadFile(
     // We always upload from the DC we are in.
     const sender = await client._borrowExportedSender(client.session.dcId);
 
+    if (!workers || !size) {
+        workers = 1;
+    }
+    if (workers >= partCount) {
+        workers = partCount;
+    }
+
+    let progress = 0;
     if (onProgress) {
-        onProgress(0);
+        onProgress(progress);
     }
 
+<<<<<<< HEAD
     for (let i = 0; i < partCount; i++) {
         const bytes = buffer.slice(i * partSize, (i + 1) * partSize);
         const result = await sender.send(
@@ -63,6 +74,58 @@ export async function uploadFile(
             }
 
             onProgress((i + 1) / partCount);
+=======
+    for (let i = 0; i < partCount; i += workers) {
+        let sendingParts = [];
+        let end = i + workers;
+        if (end > partCount) {
+            end = partCount;
+        }
+
+        for (let j = i; j < end; j++) {
+            const bytes = buffer.slice(j * partSize, (j + 1) * partSize);
+
+            sendingParts.push((async () => {
+                await sender.send(
+                    isLarge
+                        ? new Api.upload.SaveBigFilePart({
+                            fileId,
+                            filePart: j,
+                            fileTotalParts: partCount,
+                            bytes,
+                        })
+                        : new Api.upload.SaveFilePart({
+                            fileId,
+                            filePart: j,
+                            bytes,
+                        }),
+                );
+
+                if (onProgress) {
+                    if (onProgress.isCanceled) {
+                        throw new Error('USER_CANCELED');
+                    }
+
+                    progress += (1 / partCount);
+                    onProgress(progress);
+                }
+            })());
+
+        }
+        try {
+            await Promise.race([
+                await Promise.all(sendingParts),
+                sleep(UPLOAD_TIMEOUT * workers).then(() => Promise.reject(new Error('TIMEOUT'))),
+            ]);
+        } catch (err) {
+            if (err.message === 'TIMEOUT') {
+                console.warn('Upload timeout. Retrying...');
+                i -= workers;
+                continue;
+            }
+
+            throw err;
+>>>>>>> edef29da... [Perf] GramJs: Add parallel uploads (#659)
         }
     }