浏览代码

Merge pull request #4603 from pixelfed/staging

Prepare for groups
daniel 1 年之前
父节点
当前提交
28bca423d7

+ 4 - 0
CHANGELOG.md

@@ -1,6 +1,10 @@
 # Release Notes
 
 ## [Unreleased](https://github.com/pixelfed/pixelfed/compare/v0.11.9...dev)
+
+### Updates
+- Update FollowerService, add forget method to RelationshipService call to reduce load when mass purging ([347e4f59](https://github.com/pixelfed/pixelfed/commit/347e4f59))
+- Update FollowServiceWarmCache, improve handling larger following/follower lists ([61a6d904](https://github.com/pixelfed/pixelfed/commit/61a6d904))
 -  ([](https://github.com/pixelfed/pixelfed/commit/))
 
 ## [v0.11.9 (2023-08-21)](https://github.com/pixelfed/pixelfed/compare/v0.11.8...v0.11.9)

+ 62 - 19
app/Jobs/FollowPipeline/FollowServiceWarmCache.php

@@ -8,10 +8,13 @@ use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;
+use Illuminate\Queue\Middleware\WithoutOverlapping;
 use App\Services\AccountService;
 use App\Services\FollowerService;
 use Cache;
 use DB;
+use Storage;
+use App\Follower;
 use App\Profile;
 
 class FollowServiceWarmCache implements ShouldQueue
@@ -23,6 +26,16 @@ class FollowServiceWarmCache implements ShouldQueue
 	public $timeout = 5000;
 	public $failOnTimeout = false;
 
+    /**
+     * Get the middleware the job should pass through.
+     *
+     * @return array<int, object>
+     */
+    public function middleware(): array
+    {
+        return [(new WithoutOverlapping($this->profileId))->dontRelease()];
+    }
+
 	/**
 	 * Create a new job instance.
 	 *
@@ -42,6 +55,10 @@ class FollowServiceWarmCache implements ShouldQueue
 	{
 		$id = $this->profileId;
 
+        if(Cache::has(FollowerService::FOLLOWERS_SYNC_KEY . $id) && Cache::has(FollowerService::FOLLOWING_SYNC_KEY . $id)) {
+            return;
+        }
+
 		$account = AccountService::get($id, true);
 
 		if(!$account) {
@@ -50,25 +67,43 @@ class FollowServiceWarmCache implements ShouldQueue
 			return;
 		}
 
-		DB::table('followers')
-			->select('id', 'following_id', 'profile_id')
-			->whereFollowingId($id)
-			->orderBy('id')
-			->chunk(200, function($followers) use($id) {
-			foreach($followers as $follow) {
-				FollowerService::add($follow->profile_id, $id);
-			}
-		});
-
-		DB::table('followers')
-			->select('id', 'following_id', 'profile_id')
-			->whereProfileId($id)
-			->orderBy('id')
-			->chunk(200, function($followers) use($id) {
-			foreach($followers as $follow) {
-				FollowerService::add($id, $follow->following_id);
-			}
-		});
+        $hasFollowerPostProcessing = false;
+        $hasFollowingPostProcessing = false;
+
+        if(Follower::whereProfileId($id)->orWhere('following_id', $id)->count()) {
+            $following = [];
+            $followers = [];
+    		foreach(Follower::lazy() as $follow) {
+                if($follow->following_id != $id && $follow->profile_id != $id) {
+                    continue;
+                }
+                if($follow->profile_id == $id) {
+                    $following[] = $follow->following_id;
+                } else {
+                    $followers[] = $follow->profile_id;
+                }
+            }
+
+            if(count($followers) > 100) {
+                // store follower ids and process in another job
+                Storage::put('follow-warm-cache/' . $id . '/followers.json', json_encode($followers));
+                $hasFollowerPostProcessing = true;
+            } else {
+                foreach($followers as $follower) {
+                    FollowerService::add($follower, $id);
+                }
+            }
+
+            if(count($following) > 100) {
+                // store following ids and process in another job
+                Storage::put('follow-warm-cache/' . $id . '/following.json', json_encode($following));
+                $hasFollowingPostProcessing = true;
+            } else {
+                foreach($following as $following) {
+                    FollowerService::add($id, $following);
+                }
+            }
+        }
 
 		Cache::put(FollowerService::FOLLOWERS_SYNC_KEY . $id, 1, 604800);
 		Cache::put(FollowerService::FOLLOWING_SYNC_KEY . $id, 1, 604800);
@@ -82,6 +117,14 @@ class FollowServiceWarmCache implements ShouldQueue
 
 		AccountService::del($id);
 
+        if($hasFollowingPostProcessing) {
+            FollowServiceWarmCacheLargeIngestPipeline::dispatch($id, 'following')->onQueue('follow');
+        }
+
+        if($hasFollowerPostProcessing) {
+            FollowServiceWarmCacheLargeIngestPipeline::dispatch($id, 'followers')->onQueue('follow');
+        }
+
 		return;
 	}
 }

+ 88 - 0
app/Jobs/FollowPipeline/FollowServiceWarmCacheLargeIngestPipeline.php

@@ -0,0 +1,88 @@
+<?php
+
+namespace App\Jobs\FollowPipeline;
+
+use Illuminate\Bus\Queueable;
+use Illuminate\Contracts\Queue\ShouldBeUnique;
+use Illuminate\Contracts\Queue\ShouldQueue;
+use Illuminate\Foundation\Bus\Dispatchable;
+use Illuminate\Queue\InteractsWithQueue;
+use Illuminate\Queue\SerializesModels;
+use App\Services\AccountService;
+use App\Services\FollowerService;
+use Cache;
+use DB;
+use Storage;
+use App\Follower;
+use App\Profile;
+
+class FollowServiceWarmCacheLargeIngestPipeline implements ShouldQueue
+{
+    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
+
+    public $profileId;
+    public $followType;
+    public $tries = 5;
+    public $timeout = 5000;
+    public $failOnTimeout = false;
+
+    /**
+     * Create a new job instance.
+     *
+     * @return void
+     */
+    public function __construct($profileId, $followType = 'following')
+    {
+        $this->profileId = $profileId;
+        $this->followType = $followType;
+    }
+
+    /**
+     * Execute the job.
+     *
+     * @return void
+     */
+    public function handle()
+    {
+        $pid = $this->profileId;
+        $type = $this->followType;
+
+        if($type === 'followers') {
+            $key = 'follow-warm-cache/' . $pid . '/followers.json';
+            if(!Storage::exists($key)) {
+                return;
+            }
+            $file = Storage::get($key);
+            $json = json_decode($file, true);
+
+            foreach($json as $id) {
+                FollowerService::add($id, $pid, false);
+                usleep(random_int(500, 3000));
+            }
+            sleep(5);
+            Storage::delete($key);
+        }
+
+        if($type === 'following') {
+            $key = 'follow-warm-cache/' . $pid . '/following.json';
+            if(!Storage::exists($key)) {
+                return;
+            }
+            $file = Storage::get($key);
+            $json = json_decode($file, true);
+
+            foreach($json as $id) {
+                FollowerService::add($pid, $id, false);
+                usleep(random_int(500, 3000));
+            }
+            sleep(5);
+            Storage::delete($key);
+        }
+
+        sleep(random_int(2, 5));
+        $files = Storage::files('follow-warm-cache/' . $pid);
+        if(empty($files)) {
+            Storage::deleteDirectory('follow-warm-cache/' . $pid);
+        }
+    }
+}

+ 6 - 2
app/Services/FollowerService.php

@@ -20,10 +20,14 @@ class FollowerService
 	const FOLLOWING_KEY = 'pf:services:follow:following:id:';
 	const FOLLOWERS_KEY = 'pf:services:follow:followers:id:';
 
-	public static function add($actor, $target)
+	public static function add($actor, $target, $refresh = true)
 	{
 		$ts = (int) microtime(true);
-		RelationshipService::refresh($actor, $target);
+        if($refresh) {
+          RelationshipService::refresh($actor, $target);
+        } else {
+		  RelationshipService::forget($actor, $target);
+        }
 		Redis::zadd(self::FOLLOWING_KEY . $actor, $ts, $target);
 		Redis::zadd(self::FOLLOWERS_KEY . $target, $ts, $actor);
 		Cache::forget('profile:following:' . $actor);

+ 8 - 0
app/Services/RelationshipService.php

@@ -66,6 +66,14 @@ class RelationshipService
 		return self::get($aid, $tid);
 	}
 
+	public static function forget($aid, $tid)
+	{
+		Cache::forget('pf:services:follower:audience:' . $aid);
+		Cache::forget('pf:services:follower:audience:' . $tid);
+		self::delete($tid, $aid);
+		self::delete($aid, $tid);
+	}
+
 	public static function defaultRelation($tid)
 	{
 		return [

+ 42 - 0
database/migrations/2021_08_04_095125_create_groups_table.php

@@ -0,0 +1,42 @@
+<?php
+
+use Illuminate\Database\Migrations\Migration;
+use Illuminate\Database\Schema\Blueprint;
+use Illuminate\Support\Facades\Schema;
+
+class CreateGroupsTable extends Migration
+{
+    /**
+     * Run the migrations.
+     *
+     * @return void
+     */
+    public function up()
+    {
+        Schema::create('groups', function (Blueprint $table) {
+            $table->bigInteger('id')->unsigned()->primary();
+            $table->bigInteger('profile_id')->unsigned()->nullable()->index();
+            $table->string('status')->nullable()->index();
+            $table->string('name')->nullable();
+            $table->text('description')->nullable();
+            $table->text('rules')->nullable();
+            $table->boolean('local')->default(true)->index();
+            $table->string('remote_url')->nullable();
+            $table->string('inbox_url')->nullable();
+            $table->boolean('is_private')->default(false);
+            $table->boolean('local_only')->default(false);
+            $table->json('metadata')->nullable();
+            $table->timestamps();
+        });
+    }
+
+    /**
+     * Reverse the migrations.
+     *
+     * @return void
+     */
+    public function down()
+    {
+        Schema::dropIfExists('groups');
+    }
+}

+ 40 - 0
database/migrations/2021_08_04_095143_create_group_members_table.php

@@ -0,0 +1,40 @@
+<?php
+
+use Illuminate\Database\Migrations\Migration;
+use Illuminate\Database\Schema\Blueprint;
+use Illuminate\Support\Facades\Schema;
+
+class CreateGroupMembersTable extends Migration
+{
+	/**
+	 * Run the migrations.
+	 *
+	 * @return void
+	 */
+	public function up()
+	{
+		Schema::create('group_members', function (Blueprint $table) {
+			$table->id();
+			$table->bigInteger('group_id')->unsigned()->index();
+			$table->bigInteger('profile_id')->unsigned()->index();
+			$table->string('role')->default('member')->index();
+			$table->boolean('local_group')->default(false)->index();
+			$table->boolean('local_profile')->default(false)->index();
+			$table->boolean('join_request')->default(false)->index();
+			$table->timestamp('approved_at')->nullable();
+			$table->timestamp('rejected_at')->nullable();
+			$table->unique(['group_id', 'profile_id']);
+			$table->timestamps();
+		});
+	}
+
+	/**
+	 * Reverse the migrations.
+	 *
+	 * @return void
+	 */
+	public function down()
+	{
+		Schema::dropIfExists('group_members');
+	}
+}

+ 42 - 0
database/migrations/2021_08_04_095238_create_group_posts_table.php

@@ -0,0 +1,42 @@
+<?php
+
+use Illuminate\Database\Migrations\Migration;
+use Illuminate\Database\Schema\Blueprint;
+use Illuminate\Support\Facades\Schema;
+
+class CreateGroupPostsTable extends Migration
+{
+	/**
+	 * Run the migrations.
+	 *
+	 * @return void
+	 */
+	public function up()
+	{
+		Schema::create('group_posts', function (Blueprint $table) {
+			$table->bigInteger('id')->unsigned()->primary();
+			$table->bigInteger('group_id')->unsigned()->index();
+			$table->bigInteger('profile_id')->unsigned()->nullable()->index();
+			$table->string('type')->nullable()->index();
+			$table->bigInteger('status_id')->unsigned()->unique();
+			$table->string('remote_url')->unique()->nullable()->index();
+			$table->bigInteger('reply_child_id')->unsigned()->nullable();
+			$table->bigInteger('in_reply_to_id')->unsigned()->nullable();
+			$table->bigInteger('reblog_of_id')->unsigned()->nullable();
+			$table->unsignedInteger('reply_count')->nullable();
+			$table->string('status')->nullable()->index();
+			$table->json('metadata')->nullable();
+			$table->timestamps();
+		});
+	}
+
+	/**
+	 * Reverse the migrations.
+	 *
+	 * @return void
+	 */
+	public function down()
+	{
+		Schema::dropIfExists('group_posts');
+	}
+}

+ 38 - 0
database/migrations/2021_08_16_072457_create_group_invitations_table.php

@@ -0,0 +1,38 @@
+<?php
+
+use Illuminate\Database\Migrations\Migration;
+use Illuminate\Database\Schema\Blueprint;
+use Illuminate\Support\Facades\Schema;
+
+class CreateGroupInvitationsTable extends Migration
+{
+	/**
+	 * Run the migrations.
+	 *
+	 * @return void
+	 */
+	public function up()
+	{
+		Schema::create('group_invitations', function (Blueprint $table) {
+			$table->bigIncrements('id');
+			$table->bigInteger('group_id')->unsigned()->index();
+			$table->bigInteger('from_profile_id')->unsigned()->index();
+			$table->bigInteger('to_profile_id')->unsigned()->index();
+			$table->string('role')->nullable();
+			$table->boolean('to_local')->default(true)->index();
+			$table->boolean('from_local')->default(true)->index();
+			$table->unique(['group_id', 'to_profile_id']);
+			$table->timestamps();
+		});
+	}
+
+	/**
+	 * Reverse the migrations.
+	 *
+	 * @return void
+	 */
+	public function down()
+	{
+		Schema::dropIfExists('group_invitations');
+	}
+}