Преглед на файлове

Update InboxPipeline, add inbox job queue and separate http sig validation from activity handling

Daniel Supernault преди 2 години
родител
ревизия
e6c1604d43

+ 47 - 0
app/Jobs/InboxPipeline/ActivityHandler.php

@@ -0,0 +1,47 @@
+<?php
+
+namespace App\Jobs\InboxPipeline;
+
+use Illuminate\Bus\Queueable;
+use Illuminate\Contracts\Queue\ShouldBeUnique;
+use Illuminate\Contracts\Queue\ShouldQueue;
+use Illuminate\Foundation\Bus\Dispatchable;
+use Illuminate\Queue\InteractsWithQueue;
+use Illuminate\Queue\SerializesModels;
+use App\Util\ActivityPub\Inbox;
+
+class ActivityHandler implements ShouldQueue
+{
+    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
+
+    protected $username;
+    protected $headers;
+    protected $payload;
+
+    public $timeout = 300;
+    public $tries = 1;
+    public $maxExceptions = 1;
+
+    /**
+     * Create a new job instance.
+     *
+     * @return void
+     */
+    public function __construct($headers, $username, $payload)
+    {
+        $this->username = $username;
+        $this->headers = $headers;
+        $this->payload = $payload;
+    }
+
+    /**
+     * Execute the job.
+     *
+     * @return void
+     */
+    public function handle()
+    {
+        (new Inbox($this->headers, $this->profile, $this->payload))->handle();
+        return;
+    }
+}

+ 1 - 1
app/Jobs/InboxPipeline/DeleteWorker.php

@@ -105,7 +105,7 @@ class DeleteWorker implements ShouldQueue
 		$profile = null;
 
 		if($this->verifySignature($headers, $payload) == true) {
-			(new Inbox($headers, $profile, $payload))->handle();
+			ActivityHandler::dispatch($headers, $profile, $payload)->onQueue('delete');
 			return 1;
 		} else {
 			return 1;

+ 1 - 1
app/Jobs/InboxPipeline/InboxValidator.php

@@ -78,7 +78,7 @@ class InboxValidator implements ShouldQueue
         }
 
         if($this->verifySignature($headers, $profile, $payload) == true) {
-            (new Inbox($headers, $profile, $payload))->handle();
+            ActivityHandler::dispatch($headers, $profile, $payload)->onQueue('inbox');
             return;
         } else {
             return;

+ 1 - 1
app/Jobs/InboxPipeline/InboxWorker.php

@@ -66,7 +66,7 @@ class InboxWorker implements ShouldQueue
         }
 
         if($this->verifySignature($headers, $payload) == true) {
-            (new Inbox($headers, $profile, $payload))->handle();
+            ActivityHandler::dispatch($headers, $profile, $payload)->onQueue('inbox');
             return;
         } else {
             return;

+ 3 - 2
config/horizon.php

@@ -83,6 +83,7 @@ return [
 		'redis:follow' => 30,
 		'redis:shared' => 30,
 		'redis:default' => 30,
+		'redis:inbox' => 30,
 		'redis:low' => 30,
 		'redis:high' => 30,
 		'redis:delete' => 30,
@@ -171,7 +172,7 @@ return [
 		'production' => [
 			'supervisor-1' => [
 				'connection'    => 'redis',
-				'queue'         => ['high', 'default', 'follow', 'shared', 'feed', 'low', 'story', 'delete', 'mmo'],
+				'queue'         => ['high', 'default', 'follow', 'shared', 'inbox', 'feed', 'low', 'story', 'delete', 'mmo'],
 				'balance'       => env('HORIZON_BALANCE_STRATEGY', 'auto'),
 				'minProcesses'  => env('HORIZON_MIN_PROCESSES', 1),
 				'maxProcesses'  => env('HORIZON_MAX_PROCESSES', 20),
@@ -185,7 +186,7 @@ return [
 		'local' => [
 			'supervisor-1' => [
 				'connection'    => 'redis',
-				'queue'         => ['high', 'default', 'follow', 'shared', 'feed', 'low', 'story', 'delete', 'mmo'],
+				'queue'         => ['high', 'default', 'follow', 'shared', 'inbox', 'feed', 'low', 'story', 'delete', 'mmo'],
 				'balance'       => 'auto',
 				'minProcesses' => 1,
 				'maxProcesses'  => 20,