1
0
Эх сурвалжийг харах

GramJS: Parallel download; Progressive: Fix caching

painor 5 жил өмнө
parent
commit
f2b3134c31

+ 188 - 0
src/api/gramjs/methods/media.ts

@@ -0,0 +1,188 @@
+import { inflate } from 'pako/dist/pako_inflate';
+
+import { Api as GramJs, TelegramClient } from '../../../lib/gramjs';
+import {
+  ApiMediaFormat, ApiOnProgress, ApiParsedMedia, ApiPreparedMedia,
+} from '../../types';
+
+import { MEDIA_CACHE_DISABLED, MEDIA_CACHE_MAX_BYTES, MEDIA_CACHE_NAME } from '../../../config';
+import localDb from '../localDb';
+import { getEntityTypeById } from '../gramjsBuilders';
+import { blobToDataUri } from '../../../util/files';
+import * as cacheApi from '../../../util/cacheApi';
+
+type EntityType = 'msg' | 'sticker' | 'gif' | 'channel' | 'chat' | 'user' | 'stickerSet';
+
+export default async function downloadMedia(
+  {
+    url, mediaFormat, start, end,
+  }: {
+    url: string; mediaFormat: ApiMediaFormat; start?: number; end?: number;
+  },
+  client: TelegramClient,
+  isConnected: boolean,
+  onProgress?: ApiOnProgress,
+) {
+  const {
+    data, mimeType, fullSize,
+  } = await download(url, client, isConnected, onProgress, start, end, mediaFormat) || {};
+  if (!data) {
+    return undefined;
+  }
+
+  const parsed = await parseMedia(data, mediaFormat, mimeType);
+  if (!parsed) {
+    return undefined;
+  }
+
+  const canCache = mediaFormat !== ApiMediaFormat.Progressive && (
+    mediaFormat !== ApiMediaFormat.BlobUrl || (parsed as Blob).size <= MEDIA_CACHE_MAX_BYTES
+  );
+  if (!MEDIA_CACHE_DISABLED && cacheApi && canCache) {
+    void cacheApi.save(MEDIA_CACHE_NAME, url, parsed);
+  }
+
+  const prepared = prepareMedia(parsed);
+
+  return { prepared, mimeType, fullSize };
+}
+
+async function download(
+  url: string,
+  client: TelegramClient,
+  isConnected: boolean,
+  onProgress?: ApiOnProgress,
+  start?: number,
+  end?: number,
+  mediaFormat?: ApiMediaFormat,
+) {
+  const mediaMatch = url.match(/(avatar|msg|stickerSet|sticker|gif|file)([-\d\w./]+)(\?size=\w+)?/);
+  if (!mediaMatch) {
+    return undefined;
+  }
+
+  if (mediaMatch[1] === 'file') {
+    const response = await fetch(mediaMatch[2]);
+    const data = await response.arrayBuffer();
+    return { data };
+  }
+
+  if (!isConnected) {
+    return Promise.reject(new Error('ERROR: Client is not connected'));
+  }
+
+  let entityType: EntityType;
+  let entityId: string | number = mediaMatch[2];
+  const sizeType = mediaMatch[3] ? mediaMatch[3].replace('?size=', '') : undefined;
+  let entity: (
+    GramJs.User | GramJs.Chat | GramJs.Channel |
+    GramJs.Message | GramJs.Document | GramJs.StickerSet | undefined
+  );
+
+  if (mediaMatch[1] === 'avatar') {
+    entityType = getEntityTypeById(Number(entityId));
+    entityId = Math.abs(Number(entityId));
+  } else {
+    entityType = mediaMatch[1] as 'msg' | 'sticker' | 'gif' | 'stickerSet';
+  }
+
+  switch (entityType) {
+    case 'channel':
+    case 'chat':
+      entity = localDb.chats[entityId as number];
+      break;
+    case 'user':
+      entity = localDb.users[entityId as number];
+      break;
+    case 'msg':
+      entity = localDb.messages[entityId as string];
+      break;
+    case 'sticker':
+    case 'gif':
+      entity = localDb.documents[entityId as string];
+      break;
+    case 'stickerSet':
+      entity = localDb.stickerSets[entityId as string];
+      break;
+  }
+
+  if (!entity) {
+    return undefined;
+  }
+
+  if (entityType === 'msg' || entityType === 'sticker' || entityType === 'gif') {
+    const data = await client.downloadMedia(entity, {
+      sizeType, start, end, progressCallback: onProgress, workers: 16,
+    });
+    let mimeType;
+    let fullSize;
+
+    if (entityType === 'sticker' && sizeType) {
+      mimeType = 'image/webp';
+    } else if (sizeType) {
+      mimeType = 'image/jpeg';
+    } else if (entity instanceof GramJs.Message) {
+      mimeType = getMessageMediaMimeType(entity);
+      if (entity.media instanceof GramJs.MessageMediaDocument && entity.media.document instanceof GramJs.Document) {
+        fullSize = entity.media.document.size;
+      }
+    } else {
+      mimeType = (entity as GramJs.Document).mimeType;
+      fullSize = (entity as GramJs.Document).size;
+    }
+
+    return { mimeType, data, fullSize };
+  } else if (entityType === 'stickerSet') {
+    const data = await client.downloadStickerSetThumb(entity);
+    const mimeType = mediaFormat === ApiMediaFormat.Lottie ? 'application/json' : 'image/jpeg';
+
+    return { mimeType, data };
+  } else {
+    const data = await client.downloadProfilePhoto(entity, sizeType === 'big');
+    const mimeType = 'image/jpeg';
+
+    return { mimeType, data };
+  }
+}
+
+function getMessageMediaMimeType(message: GramJs.Message) {
+  if (!message || !message.media) {
+    return undefined;
+  }
+
+  if (message.media instanceof GramJs.MessageMediaPhoto) {
+    return 'image/jpeg';
+  }
+
+  if (message.media instanceof GramJs.MessageMediaDocument && message.media.document instanceof GramJs.Document) {
+    return message.media.document.mimeType;
+  }
+
+  return undefined;
+}
+
+// eslint-disable-next-line no-async-without-await/no-async-without-await
+async function parseMedia(
+  data: Buffer, mediaFormat: ApiMediaFormat, mimeType?: string,
+): Promise<ApiParsedMedia | undefined> {
+  switch (mediaFormat) {
+    case ApiMediaFormat.DataUri:
+      return blobToDataUri(new Blob([data], { type: mimeType }));
+    case ApiMediaFormat.BlobUrl:
+      return new Blob([data], { type: mimeType });
+    case ApiMediaFormat.Lottie: {
+      const json = inflate(data, { to: 'string' });
+      return JSON.parse(json);
+    }
+  }
+
+  return undefined;
+}
+
+function prepareMedia(mediaData: ApiParsedMedia): ApiPreparedMedia {
+  if (mediaData instanceof Blob) {
+    return URL.createObjectURL(mediaData);
+  }
+
+  return mediaData;
+}

+ 96 - 9
src/lib/gramjs/client/TelegramClient.js

@@ -231,7 +231,7 @@ class TelegramClient {
                 return promise
                     .then((sender) => sender.disconnect())
             })
-        ]);
+        ])
 
         this._eventBuilders = []
     }
@@ -269,8 +269,12 @@ class TelegramClient {
         }
 =======
     removeSender(dcId) {
+<<<<<<< HEAD
         delete this._borrowedSenderPromises[dcId];
 >>>>>>> 6129de89... GramJS: Log out on `InvalidBufferError` (#481)
+=======
+        delete this._borrowedSenderPromises[dcId]
+>>>>>>> bb792a8a... GramJS: Parallel download; Progressive: Fix caching
     }
 
     async _borrowExportedSender(dcId, retries = 5) {
@@ -341,11 +345,12 @@ class TelegramClient {
      * @param [args[fileSize] {number}]
      * @param [args[progressCallback] {Function}]
      * @param [args[dcId] {number}]
+     * @param [args[workers] {number}]
      * @returns {Promise<Buffer>}
      */
     async downloadFile(inputLocation, args = {}) {
 
-        let { partSizeKb, fileSize } = args
+        let { partSizeKb, fileSize, workers } = args
         const { dcId } = args
 
         if (!partSizeKb) {
@@ -390,6 +395,7 @@ class TelegramClient {
         }
 
         this._log.info(`Downloading file in chunks of ${partSize} bytes`)
+<<<<<<< HEAD
 
         try {
             let offset = 0
@@ -418,22 +424,73 @@ class TelegramClient {
                         continue
                     } else {
                         throw e
-                    }
-                }
-                offset += partSize
+=======
+        if (args.progressCallback) {
+            args.progressCallback(0)
+        }
+
+        if (!workers) {
+            workers = 1
+        }
 
-                if (result.bytes.length) {
-                    this._log.debug(`Saving ${result.bytes.length} more bytes`)
+        try {
+            let limit = partSize
+            let offset = args.start || 0
+            while (true) {
+                let results = []
+                let i = 0
+                while (true) {
+                    let precise = false;
+                    if (Math.floor(offset / ONE_MB) !== Math.floor((offset + limit - 1) / ONE_MB)) {
+                        limit = ONE_MB - offset % ONE_MB
+                        precise = true
+                    }
 
-                    fileWriter.write(result.bytes)
+                    results.push(sender.send(new requests.upload.GetFile({
+                        location: inputLocation,
+                        offset,
+                        limit,
+                        precise
+                    })))
+
+                    offset += partSize
+                    i++
+                    if ((args.end && (offset + partSize) > args.end) || workers === i) {
+                        break
+>>>>>>> bb792a8a... GramJS: Parallel download; Progressive: Fix caching
+                    }
+                }
+                results = await Promise.all(results)
+                for (const result of results) {
+                    if (result.bytes.length) {
+                        this._log.debug(`Saving ${result.bytes.length} more bytes`)
+
+                        fileWriter.write(result.bytes)
+                        if (args.progressCallback) {
+                            if (args.progressCallback.isCanceled) {
+                                throw new Error('USER_CANCELED')
+                            }
 
+<<<<<<< HEAD
                     if (args.progressCallback) {
                         await args.progressCallback(fileWriter.getValue().length, fileSize)
+=======
+                            const progress = offset / fileSize
+                            args.progressCallback(progress)
+                        }
+                    }
+                    // Last chunk.
+                    if (result.bytes.length < partSize) {
+                        return fileWriter.getValue()
+>>>>>>> bb792a8a... GramJS: Parallel download; Progressive: Fix caching
                     }
                 }
-
                 // Last chunk.
+<<<<<<< HEAD
                 if (result.bytes.length < partSize) {
+=======
+                if (args.end && (offset + results[results.length - 1].bytes.length) > args.end) {
+>>>>>>> bb792a8a... GramJS: Parallel download; Progressive: Fix caching
                     return fileWriter.getValue()
                 }
             }
@@ -657,9 +714,22 @@ class TelegramClient {
             return
         }
 
+<<<<<<< HEAD
         const size = doc.thumbs ? this._pickFileSize(doc.thumbs, args.sizeType) : null
         if (size && (size instanceof constructors.PhotoCachedSize || size instanceof constructors.PhotoStrippedSize)) {
             return this._downloadCachedPhotoSize(size)
+=======
+        let size = null
+        if (args.sizeType) {
+            size = doc.thumbs ? this._pickFileSize(doc.thumbs, args.sizeType) : null
+            if (!size && doc.mimeType.startsWith('video/')) {
+                return
+            }
+
+            if (size && (size instanceof constructors.PhotoCachedSize || size instanceof constructors.PhotoStrippedSize)) {
+                return this._downloadCachedPhotoSize(size)
+            }
+>>>>>>> bb792a8a... GramJS: Parallel download; Progressive: Fix caching
         }
 <<<<<<< HEAD
         const result = await this.downloadFile(
@@ -681,6 +751,7 @@ class TelegramClient {
                 fileSize: size ? size.size : doc.size,
                 progressCallback: args.progressCallback,
                 dcId: doc.dcId,
+                workers: args.workers,
             },
         )
         return result
@@ -831,6 +902,7 @@ class TelegramClient {
         if (await this.isUserAuthorized()) {
             this._onAuth()
 
+<<<<<<< HEAD
             return this
         }
         if (args.code == null && !args.botToken) {
@@ -872,6 +944,16 @@ class TelegramClient {
                 if (!value) {
                     throw new Error('the phone code is empty')
                 }
+=======
+        if (await checkAuthorization(this)) {
+            return
+        }
+
+        const apiCredentials = {
+            apiId: this.apiId,
+            apiHash: this.apiHash
+        }
+>>>>>>> bb792a8a... GramJS: Parallel download; Progressive: Fix caching
 
                 if (signUp) {
                     const [firstName, lastName] = await args.firstAndLastNames()
@@ -944,6 +1026,7 @@ class TelegramClient {
         return this
     }
 
+<<<<<<< HEAD
     async signIn(args = {
         phone: null,
         code: null,
@@ -1019,6 +1102,10 @@ class TelegramClient {
         }
 
         return [phone, phoneHash]
+=======
+    uploadFile(fileParams) {
+        return uploadFile(this, fileParams)
+>>>>>>> bb792a8a... GramJS: Parallel download; Progressive: Fix caching
     }
 
     // endregion

+ 163 - 0
src/serviceWorker/progressive.ts

@@ -0,0 +1,163 @@
+import { pause } from '../util/schedulers';
+import generateIdFor from '../util/generateIdFor';
+import { DEBUG, MEDIA_CACHE_MAX_BYTES, MEDIA_PROGRESSIVE_CACHE_NAME } from '../config';
+
+declare const self: ServiceWorkerGlobalScope;
+
+type PartInfo = {
+  type: 'PartInfo';
+  arrayBuffer: ArrayBuffer;
+  mimeType: 'string';
+  fullSize: number;
+};
+
+type RequestStates = {
+  resolve: (response: PartInfo) => void;
+  reject: () => void;
+};
+
+const DEFAULT_PART_SIZE = 512 * 1024; // 512 kB
+const PART_TIMEOUT = 10000;
+const MAX_END_TO_CACHE = 3 * 1024 * 1024 - 1; // We only cache the first 3 MB of each file
+
+const requestStates: Record<string, RequestStates> = {};
+
+export async function respondForProgressive(e: FetchEvent) {
+  const { url } = e.request;
+  const range = e.request.headers.get('range');
+  const bytes = /^bytes=(\d+)-(\d+)?$/g.exec(range || '')!;
+  const start = Number(bytes[1]);
+
+  let end = Number(bytes[2]);
+  if (!end || (end - start + 1) > DEFAULT_PART_SIZE) {
+    end = start + DEFAULT_PART_SIZE - 1;
+  }
+
+  const cacheKey = `${url}?start=${start}&end=${end}`;
+  const [cachedArrayBuffer, cachedHeaders] = await fetchFromCache(cacheKey);
+
+  if (DEBUG) {
+    // eslint-disable-next-line no-console
+    console.log('FETCH PROGRESSIVE', cacheKey, 'CACHED:', Boolean(cachedArrayBuffer));
+  }
+
+  if (cachedArrayBuffer) {
+    return new Response(cachedArrayBuffer, {
+      status: 206,
+      statusText: 'Partial Content',
+      headers: cachedHeaders,
+    });
+  }
+
+  let partInfo;
+  try {
+    partInfo = await requestPart(e, { url, start, end });
+  } catch (err) {
+    if (DEBUG) {
+      // eslint-disable-next-line no-console
+      console.error('FETCH PROGRESSIVE', err);
+    }
+  }
+
+  if (!partInfo) {
+    return new Response('', {
+      status: 500,
+      statusText: 'Failed to fetch progressive part',
+    });
+  }
+
+  const { arrayBuffer, fullSize, mimeType } = partInfo;
+
+  const partSize = Math.min(end - start + 1, arrayBuffer.byteLength);
+  end = start + partSize - 1;
+  const arrayBufferPart = arrayBuffer.slice(0, partSize);
+  const headers = [
+    ['Content-Range', `bytes ${start}-${end}/${fullSize}`],
+    ['Accept-Ranges', 'bytes'],
+    ['Content-Length', String(partSize)],
+    ['Content-Type', mimeType],
+  ];
+
+  if (partSize <= MEDIA_CACHE_MAX_BYTES && end < MAX_END_TO_CACHE) {
+    saveToCache(cacheKey, arrayBufferPart, headers);
+  }
+
+  return new Response(arrayBufferPart, {
+    status: 206,
+    statusText: 'Partial Content',
+    headers,
+  });
+}
+
+self.addEventListener('message', (e) => {
+  const { type, messageId, result } = e.data as {
+    type: string;
+    messageId: string;
+    result: PartInfo;
+  };
+
+  if (type === 'partResponse' && requestStates[messageId]) {
+    requestStates[messageId].resolve(result);
+  }
+});
+
+// We can not cache 206 responses: https://github.com/GoogleChrome/workbox/issues/1644#issuecomment-638741359
+async function fetchFromCache(cacheKey: string) {
+  const cache = await self.caches.open(MEDIA_PROGRESSIVE_CACHE_NAME);
+
+  return Promise.all([
+    cache.match(`${cacheKey}&type=arrayBuffer`).then((r) => (r ? r.arrayBuffer() : undefined)),
+    cache.match(`${cacheKey}&type=headers`).then((r) => (r ? r.json() : undefined)),
+  ]);
+}
+
+async function saveToCache(cacheKey: string, arrayBuffer: ArrayBuffer, headers: HeadersInit) {
+  const cache = await self.caches.open(MEDIA_PROGRESSIVE_CACHE_NAME);
+
+  return Promise.all([
+    cache.put(new Request(`${cacheKey}&type=arrayBuffer`), new Response(arrayBuffer)),
+    cache.put(new Request(`${cacheKey}&type=headers`), new Response(JSON.stringify(headers))),
+  ]);
+}
+
+async function requestPart(
+  e: FetchEvent,
+  params: { url: string; start: number; end: number },
+): Promise<PartInfo | undefined> {
+  if (!e.clientId) {
+    return undefined;
+  }
+
+  // eslint-disable-next-line no-restricted-globals
+  const client = await self.clients.get(e.clientId);
+  if (!client) {
+    return undefined;
+  }
+
+  const messageId = generateIdFor(requestStates);
+  requestStates[messageId] = {} as RequestStates;
+
+  const promise = Promise.race([
+    pause(PART_TIMEOUT).then(() => Promise.reject(new Error('ERROR_PART_TIMEOUT'))),
+    new Promise<PartInfo>((resolve, reject) => {
+      Object.assign(requestStates[messageId], { resolve, reject });
+    }),
+  ]);
+
+  promise.then(
+    () => {
+      delete requestStates[messageId];
+    },
+    () => {
+      delete requestStates[messageId];
+    },
+  );
+
+  client.postMessage({
+    type: 'requestPart',
+    messageId,
+    params,
+  });
+
+  return promise;
+}