浏览代码

Update NewStatusPipeline, improve fallback

Daniel Supernault 4 月之前
父节点
当前提交
863ad8e107
共有 2 个文件被更改,包括 14 次插入21 次删除
  1. 12 20
      app/Jobs/StatusPipeline/NewStatusPipeline.php
  2. 2 1
      app/Media.php

+ 12 - 20
app/Jobs/StatusPipeline/NewStatusPipeline.php

@@ -4,14 +4,12 @@ namespace App\Jobs\StatusPipeline;
 
 
 use App\Media;
 use App\Media;
 use App\Status;
 use App\Status;
-use Cache;
 use Illuminate\Bus\Queueable;
 use Illuminate\Bus\Queueable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;
 use Illuminate\Queue\SerializesModels;
 use Illuminate\Support\Facades\Log;
 use Illuminate\Support\Facades\Log;
-use Illuminate\Support\Facades\Redis;
 
 
 class NewStatusPipeline implements ShouldQueue
 class NewStatusPipeline implements ShouldQueue
 {
 {
@@ -64,17 +62,10 @@ class NewStatusPipeline implements ShouldQueue
      */
      */
     public function handle()
     public function handle()
     {
     {
-        // Check if status still exists
-        if (!Status::where('id', $this->status->id)->exists()) {
-            if(config('federation.activitypub.delivery.logger.enabled')) {
-                Log::info('Status ' . $this->status->id . ' was deleted before federation');
-            }
-            return;
-        }
-
         // Skip media check if cloud storage isn't enabled or fast processing is on
         // Skip media check if cloud storage isn't enabled or fast processing is on
-        if (!config_cache('pixelfed.cloud_storage') || config('pixelfed.media_fast_process')) {
+        if (! config_cache('pixelfed.cloud_storage') || config('pixelfed.media_fast_process')) {
             $this->dispatchFederation();
             $this->dispatchFederation();
+
             return;
             return;
         }
         }
 
 
@@ -91,16 +82,18 @@ class NewStatusPipeline implements ShouldQueue
                 ->first();
                 ->first();
 
 
             // If media has been processing for more than 10 minutes, proceed anyway
             // If media has been processing for more than 10 minutes, proceed anyway
-            if ($oldestProcessingMedia && now()->diffInMinutes($oldestProcessingMedia->created_at) > 10) {
-                if(config('federation.activitypub.delivery.logger.enabled')) {
-                    Log::warning('Media processing timeout for status ' . $this->status->id . '. Proceeding with federation.');
+            if ($oldestProcessingMedia && $oldestProcessingMedia->replicated_at && $oldestProcessingMedia->replicated_at->diffInMinutes(now()) > 10) {
+                if (config('federation.activitypub.delivery.logger.enabled')) {
+                    Log::warning('Media processing timeout for status '.$this->status->id.'. Proceeding with federation.');
                 }
                 }
                 $this->dispatchFederation();
                 $this->dispatchFederation();
+
                 return;
                 return;
             }
             }
 
 
             // Release job back to queue with delay of 30 seconds
             // Release job back to queue with delay of 30 seconds
             $this->release(30);
             $this->release(30);
+
             return;
             return;
         }
         }
 
 
@@ -118,8 +111,8 @@ class NewStatusPipeline implements ShouldQueue
         try {
         try {
             StatusEntityLexer::dispatch($this->status);
             StatusEntityLexer::dispatch($this->status);
         } catch (\Exception $e) {
         } catch (\Exception $e) {
-            if(config('federation.activitypub.delivery.logger.enabled')) {
-                Log::error('Federation dispatch failed for status ' . $this->status->id . ': ' . $e->getMessage());
+            if (config('federation.activitypub.delivery.logger.enabled')) {
+                Log::error('Federation dispatch failed for status '.$this->status->id.': '.$e->getMessage());
             }
             }
             throw $e;
             throw $e;
         }
         }
@@ -128,15 +121,14 @@ class NewStatusPipeline implements ShouldQueue
     /**
     /**
      * Handle a job failure.
      * Handle a job failure.
      *
      *
-     * @param  \Throwable  $exception
      * @return void
      * @return void
      */
      */
     public function failed(\Throwable $exception)
     public function failed(\Throwable $exception)
     {
     {
-        if(config('federation.activitypub.delivery.logger.enabled')) {
-            Log::error('NewStatusPipeline failed for status ' . $this->status->id, [
+        if (config('federation.activitypub.delivery.logger.enabled')) {
+            Log::error('NewStatusPipeline failed for status '.$this->status->id, [
                 'exception' => $exception->getMessage(),
                 'exception' => $exception->getMessage(),
-                'trace' => $exception->getTraceAsString()
+                'trace' => $exception->getTraceAsString(),
             ]);
             ]);
         }
         }
     }
     }

+ 2 - 1
app/Media.php

@@ -22,7 +22,8 @@ class Media extends Model
     protected $casts = [
     protected $casts = [
         'srcset' => 'array',
         'srcset' => 'array',
         'deleted_at' => 'datetime',
         'deleted_at' => 'datetime',
-        'skip_optimize' => 'boolean'
+        'skip_optimize' => 'boolean',
+        'replicated_at' => 'datetime',
     ];
     ];
 
 
     public function status()
     public function status()