ProfileMigrationDeliverMoveActivityPipeline.php 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. <?php
  2. namespace App\Jobs\ProfilePipeline;
  3. use App\Transformer\ActivityPub\Verb\Move;
  4. use App\Util\ActivityPub\HttpSignature;
  5. use GuzzleHttp\Client;
  6. use GuzzleHttp\Pool;
  7. use Illuminate\Bus\Batchable;
  8. use Illuminate\Bus\Queueable;
  9. use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
  10. use Illuminate\Contracts\Queue\ShouldQueue;
  11. use Illuminate\Foundation\Bus\Dispatchable;
  12. use Illuminate\Queue\InteractsWithQueue;
  13. use Illuminate\Queue\Middleware\WithoutOverlapping;
  14. use Illuminate\Queue\SerializesModels;
  15. use League\Fractal;
  16. use League\Fractal\Serializer\ArraySerializer;
  17. class ProfileMigrationDeliverMoveActivityPipeline implements ShouldBeUniqueUntilProcessing, ShouldQueue
  18. {
  19. use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  20. public $migration;
  21. public $oldAccount;
  22. public $newAccount;
  23. public $timeout = 1400;
  24. public $tries = 3;
  25. public $maxExceptions = 1;
  26. public $failOnTimeout = true;
  27. /**
  28. * The number of seconds after which the job's unique lock will be released.
  29. *
  30. * @var int
  31. */
  32. public $uniqueFor = 3600;
  33. /**
  34. * Get the unique ID for the job.
  35. */
  36. public function uniqueId(): string
  37. {
  38. return 'profile:migration:deliver-move-followers:id:'.$this->migration->id;
  39. }
  40. /**
  41. * Get the middleware the job should pass through.
  42. *
  43. * @return array<int, object>
  44. */
  45. public function middleware(): array
  46. {
  47. return [(new WithoutOverlapping('profile:migration:deliver-move-followers:id:'.$this->migration->id))->shared()->dontRelease()];
  48. }
  49. /**
  50. * Create a new job instance.
  51. */
  52. public function __construct($migration, $oldAccount, $newAccount)
  53. {
  54. $this->migration = $migration;
  55. $this->oldAccount = $oldAccount;
  56. $this->newAccount = $newAccount;
  57. }
  58. /**
  59. * Execute the job.
  60. */
  61. public function handle(): void
  62. {
  63. if ($this->batch()->cancelled()) {
  64. return;
  65. }
  66. $migration = $this->migration;
  67. $profile = $this->oldAccount;
  68. $newAccount = $this->newAccount;
  69. if ($profile->domain || ! $profile->private_key) {
  70. return;
  71. }
  72. $audience = $profile->getAudienceInbox();
  73. $activitypubObject = new Move();
  74. $fractal = new Fractal\Manager();
  75. $fractal->setSerializer(new ArraySerializer());
  76. $resource = new Fractal\Resource\Item($migration, $activitypubObject);
  77. $activity = $fractal->createData($resource)->toArray();
  78. $payload = json_encode($activity);
  79. $client = new Client([
  80. 'timeout' => config('federation.activitypub.delivery.timeout'),
  81. ]);
  82. $version = config('pixelfed.version');
  83. $appUrl = config('app.url');
  84. $userAgent = "(Pixelfed/{$version}; +{$appUrl})";
  85. $requests = function ($audience) use ($client, $activity, $profile, $payload, $userAgent) {
  86. foreach ($audience as $url) {
  87. $headers = HttpSignature::sign($profile, $url, $activity, [
  88. 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
  89. 'User-Agent' => $userAgent,
  90. ]);
  91. yield function () use ($client, $url, $headers, $payload) {
  92. return $client->postAsync($url, [
  93. 'curl' => [
  94. CURLOPT_HTTPHEADER => $headers,
  95. CURLOPT_POSTFIELDS => $payload,
  96. CURLOPT_HEADER => true,
  97. CURLOPT_SSL_VERIFYPEER => true,
  98. CURLOPT_SSL_VERIFYHOST => false,
  99. ],
  100. ]);
  101. };
  102. }
  103. };
  104. $pool = new Pool($client, $requests($audience), [
  105. 'concurrency' => config('federation.activitypub.delivery.concurrency'),
  106. 'fulfilled' => function ($response, $index) {
  107. },
  108. 'rejected' => function ($reason, $index) {
  109. },
  110. ]);
  111. $promise = $pool->promise();
  112. $promise->wait();
  113. }
  114. }