MoveMigrateFollowersPipeline.php 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. <?php
  2. namespace App\Jobs\MovePipeline;
  3. use App\Follower;
  4. use App\Util\ActivityPub\Helpers;
  5. use DateTime;
  6. use DB;
  7. use Exception;
  8. use Illuminate\Contracts\Queue\ShouldQueue;
  9. use Illuminate\Foundation\Queue\Queueable;
  10. use Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis;
  11. use Illuminate\Queue\Middleware\WithoutOverlapping;
  12. class MoveMigrateFollowersPipeline implements ShouldQueue
  13. {
  14. use Queueable;
  15. public $target;
  16. public $activity;
  17. /**
  18. * The number of times the job may be attempted.
  19. *
  20. * @var int
  21. */
  22. public $tries = 15;
  23. /**
  24. * The maximum number of unhandled exceptions to allow before failing.
  25. *
  26. * @var int
  27. */
  28. public $maxExceptions = 5;
  29. /**
  30. * The number of seconds the job can run before timing out.
  31. *
  32. * @var int
  33. */
  34. public $timeout = 900;
  35. /**
  36. * Create a new job instance.
  37. */
  38. public function __construct($target, $activity)
  39. {
  40. $this->target = $target;
  41. $this->activity = $activity;
  42. }
  43. /**
  44. * Get the middleware the job should pass through.
  45. *
  46. * @return array<int, object>
  47. */
  48. public function middleware(): array
  49. {
  50. return [
  51. new WithoutOverlapping('process-move-migrate-followers:'.$this->target),
  52. (new ThrottlesExceptionsWithRedis(5, 2 * 60))->backoff(1),
  53. ];
  54. }
  55. /**
  56. * Determine the time at which the job should timeout.
  57. */
  58. public function retryUntil(): DateTime
  59. {
  60. return now()->addMinutes(15);
  61. }
  62. /**
  63. * Execute the job.
  64. */
  65. public function handle(): void
  66. {
  67. if (config('app.env') !== 'production' || (bool) config_cache('federation.activitypub.enabled') == false) {
  68. throw new Exception('Activitypub not enabled');
  69. }
  70. $target = $this->target;
  71. $actor = $this->activity;
  72. $targetAccount = Helpers::profileFetch($target);
  73. $actorAccount = Helpers::profileFetch($actor);
  74. if (! $targetAccount || ! $actorAccount) {
  75. throw new Exception('Invalid move accounts');
  76. }
  77. $activity = [
  78. '@context' => 'https://www.w3.org/ns/activitystreams',
  79. 'type' => 'Follow',
  80. 'actor' => null,
  81. 'object' => $target,
  82. ];
  83. $version = config('pixelfed.version');
  84. $appUrl = config('app.url');
  85. $userAgent = "(Pixelfed/{$version}; +{$appUrl})";
  86. $addlHeaders = [
  87. 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
  88. 'User-Agent' => $userAgent,
  89. ];
  90. $targetInbox = $targetAccount['sharedInbox'] ?? $targetAccount['inbox_url'];
  91. $targetPid = $targetAccount['id'];
  92. DB::table('followers')
  93. ->join('profiles', 'followers.profile_id', '=', 'profiles.id')
  94. ->where('followers.following_id', $actorAccount['id'])
  95. ->whereNotNull('profiles.user_id')
  96. ->whereNull('profiles.deleted_at')
  97. ->select('profiles.id', 'profiles.user_id', 'profiles.username', 'profiles.private_key', 'profiles.status')
  98. ->chunkById(100, function ($followers) use ($targetInbox, $targetPid, $target) {
  99. foreach ($followers as $follower) {
  100. if (! $follower->private_key || ! $follower->username || ! $follower->user_id || $follower->status === 'delete') {
  101. continue;
  102. }
  103. Follower::updateOrCreate([
  104. 'profile_id' => $follower->id,
  105. 'following_id' => $targetPid,
  106. ]);
  107. MoveSendFollowPipeline::dispatch($follower, $targetInbox, $targetPid, $target)->onQueue('follow');
  108. }
  109. }, 'id');
  110. }
  111. }