NewStatusPipeline.php 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. <?php
  2. namespace App\Jobs\StatusPipeline;
  3. use App\Media;
  4. use App\Status;
  5. use Illuminate\Bus\Queueable;
  6. use Illuminate\Contracts\Queue\ShouldQueue;
  7. use Illuminate\Foundation\Bus\Dispatchable;
  8. use Illuminate\Queue\InteractsWithQueue;
  9. use Illuminate\Queue\SerializesModels;
  10. use Illuminate\Support\Facades\Log;
  11. class NewStatusPipeline implements ShouldQueue
  12. {
  13. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  14. protected $status;
  15. /**
  16. * Delete the job if its models no longer exist.
  17. *
  18. * @var bool
  19. */
  20. public $deleteWhenMissingModels = true;
  21. /**
  22. * Increased timeout to handle cloud storage operations
  23. *
  24. * @var int
  25. */
  26. public $timeout = 30;
  27. /**
  28. * Number of times to attempt the job
  29. *
  30. * @var int
  31. */
  32. public $tries = 3;
  33. /**
  34. * Backoff periods between retries (in seconds)
  35. *
  36. * @var array
  37. */
  38. public $backoff = [30, 60, 120];
  39. /**
  40. * Create a new job instance.
  41. *
  42. * @return void
  43. */
  44. public function __construct(Status $status)
  45. {
  46. $this->status = $status;
  47. }
  48. /**
  49. * Execute the job.
  50. *
  51. * @return void
  52. */
  53. public function handle()
  54. {
  55. // Skip media check if cloud storage isn't enabled or fast processing is on
  56. if (! config_cache('pixelfed.cloud_storage') || config('pixelfed.media_fast_process')) {
  57. $this->dispatchFederation();
  58. return;
  59. }
  60. // Check for media still processing
  61. $stillProcessing = Media::whereStatusId($this->status->id)
  62. ->whereNull('cdn_url')
  63. ->exists();
  64. if ($stillProcessing) {
  65. // Get the oldest processing media item
  66. $oldestProcessingMedia = Media::whereStatusId($this->status->id)
  67. ->whereNull('cdn_url')
  68. ->oldest()
  69. ->first();
  70. // If media has been processing for more than 10 minutes, proceed anyway
  71. if ($oldestProcessingMedia && $oldestProcessingMedia->replicated_at && $oldestProcessingMedia->replicated_at->diffInMinutes(now()) > 10) {
  72. if (config('federation.activitypub.delivery.logger.enabled')) {
  73. Log::warning('Media processing timeout for status '.$this->status->id.'. Proceeding with federation.');
  74. }
  75. $this->dispatchFederation();
  76. return;
  77. }
  78. // Release job back to queue with delay of 30 seconds
  79. $this->release(30);
  80. return;
  81. }
  82. // All media processed, proceed with federation
  83. $this->dispatchFederation();
  84. }
  85. /**
  86. * Dispatch the federation job
  87. *
  88. * @return void
  89. */
  90. protected function dispatchFederation()
  91. {
  92. try {
  93. StatusEntityLexer::dispatch($this->status);
  94. } catch (\Exception $e) {
  95. if (config('federation.activitypub.delivery.logger.enabled')) {
  96. Log::error('Federation dispatch failed for status '.$this->status->id.': '.$e->getMessage());
  97. }
  98. throw $e;
  99. }
  100. }
  101. /**
  102. * Handle a job failure.
  103. *
  104. * @return void
  105. */
  106. public function failed(\Throwable $exception)
  107. {
  108. if (config('federation.activitypub.delivery.logger.enabled')) {
  109. Log::error('NewStatusPipeline failed for status '.$this->status->id, [
  110. 'exception' => $exception->getMessage(),
  111. 'trace' => $exception->getTraceAsString(),
  112. ]);
  113. }
  114. }
  115. }