Преглед изворни кода

Update StatusDeletePipeline, make async

Daniel Supernault пре 6 година
родитељ
комит
0d73a57d75
1 измењених фајлова са 36 додато и 3 уклоњено
  1. 36 3
      app/Jobs/StatusPipeline/StatusDelete.php

+ 36 - 3
app/Jobs/StatusPipeline/StatusDelete.php

@@ -17,6 +17,10 @@ use League\Fractal;
 use League\Fractal\Serializer\ArraySerializer;
 use League\Fractal\Serializer\ArraySerializer;
 use App\Transformer\ActivityPub\Verb\DeleteNote;
 use App\Transformer\ActivityPub\Verb\DeleteNote;
 use App\Util\ActivityPub\Helpers;
 use App\Util\ActivityPub\Helpers;
+use GuzzleHttp\Pool;
+use GuzzleHttp\Client;
+use GuzzleHttp\Promise;
+use App\Util\ActivityPub\HttpSignature;
 
 
 class StatusDelete implements ShouldQueue
 class StatusDelete implements ShouldQueue
 {
 {
@@ -109,9 +113,38 @@ class StatusDelete implements ShouldQueue
 
 
         $this->unlinkRemoveMedia($status);
         $this->unlinkRemoveMedia($status);
         
         
-        foreach($audience as $url) {
-            Helpers::sendSignedObject($profile, $url, $activity);
-        }
+        $payload = json_encode($activity);
+        
+        $client = new Client([
+            'timeout'  => config('pixelfed.ap_delivery_timeout')
+        ]);
+
+        $requests = function($audience) use ($client, $activity, $profile, $payload) {
+            foreach($audience as $url) {
+                $headers = HttpSignature::sign($profile, $url, $activity);
+                yield function() use ($client, $url, $headers, $payload) {
+                    return $client->postAsync($url, [
+                        'curl' => [
+                            CURLOPT_HTTPHEADER => $headers, 
+                            CURLOPT_POSTFIELDS => $payload,
+                            CURLOPT_HEADER => true
+                        ]
+                    ]);
+                };
+            }
+        };
+
+        $pool = new Pool($client, $requests($audience), [
+            'concurrency' => config('pixelfed.ap_delivery_concurrency'),
+            'fulfilled' => function ($response, $index) {
+            },
+            'rejected' => function ($reason, $index) {
+            }
+        ]);
+        
+        $promise = $pool->promise();
+
+        $promise->wait();
 
 
     }
     }
 }
 }