StoryFanout.php 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. <?php
  2. namespace App\Jobs\StoryPipeline;
  3. use Cache, Log;
  4. use App\Story;
  5. use Illuminate\Bus\Queueable;
  6. use Illuminate\Contracts\Queue\ShouldQueue;
  7. use Illuminate\Foundation\Bus\Dispatchable;
  8. use Illuminate\Queue\InteractsWithQueue;
  9. use Illuminate\Queue\SerializesModels;
  10. use League\Fractal;
  11. use League\Fractal\Serializer\ArraySerializer;
  12. use App\Transformer\ActivityPub\Verb\CreateStory;
  13. use App\Util\ActivityPub\Helpers;
  14. use GuzzleHttp\Pool;
  15. use GuzzleHttp\Client;
  16. use GuzzleHttp\Promise;
  17. use App\Util\ActivityPub\HttpSignature;
  18. use App\Services\FollowerService;
  19. use App\Services\StoryService;
  20. class StoryFanout implements ShouldQueue
  21. {
  22. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  23. protected $story;
  24. /**
  25. * Delete the job if its models no longer exist.
  26. *
  27. * @var bool
  28. */
  29. public $deleteWhenMissingModels = true;
  30. /**
  31. * Create a new job instance.
  32. *
  33. * @return void
  34. */
  35. public function __construct(Story $story)
  36. {
  37. $this->story = $story;
  38. }
  39. /**
  40. * Execute the job.
  41. *
  42. * @return void
  43. */
  44. public function handle()
  45. {
  46. $story = $this->story;
  47. $profile = $story->profile;
  48. if($story->local == false || $story->remote_url) {
  49. return;
  50. }
  51. StoryService::delLatest($story->profile_id);
  52. $audience = FollowerService::softwareAudience($story->profile_id, 'pixelfed');
  53. if(empty($audience)) {
  54. // Return on profiles with no remote followers
  55. return;
  56. }
  57. $fractal = new Fractal\Manager();
  58. $fractal->setSerializer(new ArraySerializer());
  59. $resource = new Fractal\Resource\Item($story, new CreateStory());
  60. $activity = $fractal->createData($resource)->toArray();
  61. $payload = json_encode($activity);
  62. $client = new Client([
  63. 'timeout' => config('federation.activitypub.delivery.timeout')
  64. ]);
  65. $requests = function($audience) use ($client, $activity, $profile, $payload) {
  66. foreach($audience as $url) {
  67. $headers = HttpSignature::sign($profile, $url, $activity);
  68. yield function() use ($client, $url, $headers, $payload) {
  69. return $client->postAsync($url, [
  70. 'curl' => [
  71. CURLOPT_HTTPHEADER => $headers,
  72. CURLOPT_POSTFIELDS => $payload,
  73. CURLOPT_HEADER => true
  74. ]
  75. ]);
  76. };
  77. }
  78. };
  79. $pool = new Pool($client, $requests($audience), [
  80. 'concurrency' => config('federation.activitypub.delivery.concurrency'),
  81. 'fulfilled' => function ($response, $index) {
  82. },
  83. 'rejected' => function ($reason, $index) {
  84. }
  85. ]);
  86. $promise = $pool->promise();
  87. $promise->wait();
  88. }
  89. }