소스 검색

Update FollowServiceWarmCache, improve handling larger following/follower lists

Daniel Supernault 1 년 전
부모
커밋
61a6d90403
2개의 변경된 파일150개의 추가작업 그리고 19개의 파일을 삭제
  1. 62 19
      app/Jobs/FollowPipeline/FollowServiceWarmCache.php
  2. 88 0
      app/Jobs/FollowPipeline/FollowServiceWarmCacheLargeIngestPipeline.php

+ 62 - 19
app/Jobs/FollowPipeline/FollowServiceWarmCache.php

@@ -8,10 +8,13 @@ use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;
 use Illuminate\Queue\SerializesModels;
+use Illuminate\Queue\Middleware\WithoutOverlapping;
 use App\Services\AccountService;
 use App\Services\AccountService;
 use App\Services\FollowerService;
 use App\Services\FollowerService;
 use Cache;
 use Cache;
 use DB;
 use DB;
+use Storage;
+use App\Follower;
 use App\Profile;
 use App\Profile;
 
 
 class FollowServiceWarmCache implements ShouldQueue
 class FollowServiceWarmCache implements ShouldQueue
@@ -23,6 +26,16 @@ class FollowServiceWarmCache implements ShouldQueue
 	public $timeout = 5000;
 	public $timeout = 5000;
 	public $failOnTimeout = false;
 	public $failOnTimeout = false;
 
 
+    /**
+     * Get the middleware the job should pass through.
+     *
+     * @return array<int, object>
+     */
+    public function middleware(): array
+    {
+        return [(new WithoutOverlapping($this->profileId))->dontRelease()];
+    }
+
 	/**
 	/**
 	 * Create a new job instance.
 	 * Create a new job instance.
 	 *
 	 *
@@ -42,6 +55,10 @@ class FollowServiceWarmCache implements ShouldQueue
 	{
 	{
 		$id = $this->profileId;
 		$id = $this->profileId;
 
 
+        if(Cache::has(FollowerService::FOLLOWERS_SYNC_KEY . $id) && Cache::has(FollowerService::FOLLOWING_SYNC_KEY . $id)) {
+            return;
+        }
+
 		$account = AccountService::get($id, true);
 		$account = AccountService::get($id, true);
 
 
 		if(!$account) {
 		if(!$account) {
@@ -50,25 +67,43 @@ class FollowServiceWarmCache implements ShouldQueue
 			return;
 			return;
 		}
 		}
 
 
-		DB::table('followers')
-			->select('id', 'following_id', 'profile_id')
-			->whereFollowingId($id)
-			->orderBy('id')
-			->chunk(200, function($followers) use($id) {
-			foreach($followers as $follow) {
-				FollowerService::add($follow->profile_id, $id);
-			}
-		});
-
-		DB::table('followers')
-			->select('id', 'following_id', 'profile_id')
-			->whereProfileId($id)
-			->orderBy('id')
-			->chunk(200, function($followers) use($id) {
-			foreach($followers as $follow) {
-				FollowerService::add($id, $follow->following_id);
-			}
-		});
+        $hasFollowerPostProcessing = false;
+        $hasFollowingPostProcessing = false;
+
+        if(Follower::whereProfileId($id)->orWhere('following_id', $id)->count()) {
+            $following = [];
+            $followers = [];
+    		foreach(Follower::lazy() as $follow) {
+                if($follow->following_id != $id && $follow->profile_id != $id) {
+                    continue;
+                }
+                if($follow->profile_id == $id) {
+                    $following[] = $follow->following_id;
+                } else {
+                    $followers[] = $follow->profile_id;
+                }
+            }
+
+            if(count($followers) > 100) {
+                // store follower ids and process in another job
+                Storage::put('follow-warm-cache/' . $id . '/followers.json', json_encode($followers));
+                $hasFollowerPostProcessing = true;
+            } else {
+                foreach($followers as $follower) {
+                    FollowerService::add($follower, $id);
+                }
+            }
+
+            if(count($following) > 100) {
+                // store following ids and process in another job
+                Storage::put('follow-warm-cache/' . $id . '/following.json', json_encode($following));
+                $hasFollowingPostProcessing = true;
+            } else {
+                foreach($following as $following) {
+                    FollowerService::add($id, $following);
+                }
+            }
+        }
 
 
 		Cache::put(FollowerService::FOLLOWERS_SYNC_KEY . $id, 1, 604800);
 		Cache::put(FollowerService::FOLLOWERS_SYNC_KEY . $id, 1, 604800);
 		Cache::put(FollowerService::FOLLOWING_SYNC_KEY . $id, 1, 604800);
 		Cache::put(FollowerService::FOLLOWING_SYNC_KEY . $id, 1, 604800);
@@ -82,6 +117,14 @@ class FollowServiceWarmCache implements ShouldQueue
 
 
 		AccountService::del($id);
 		AccountService::del($id);
 
 
+        if($hasFollowingPostProcessing) {
+            FollowServiceWarmCacheLargeIngestPipeline::dispatch($id, 'following')->onQueue('follow');
+        }
+
+        if($hasFollowerPostProcessing) {
+            FollowServiceWarmCacheLargeIngestPipeline::dispatch($id, 'followers')->onQueue('follow');
+        }
+
 		return;
 		return;
 	}
 	}
 }
 }

+ 88 - 0
app/Jobs/FollowPipeline/FollowServiceWarmCacheLargeIngestPipeline.php

@@ -0,0 +1,88 @@
+<?php
+
+namespace App\Jobs\FollowPipeline;
+
+use Illuminate\Bus\Queueable;
+use Illuminate\Contracts\Queue\ShouldBeUnique;
+use Illuminate\Contracts\Queue\ShouldQueue;
+use Illuminate\Foundation\Bus\Dispatchable;
+use Illuminate\Queue\InteractsWithQueue;
+use Illuminate\Queue\SerializesModels;
+use App\Services\AccountService;
+use App\Services\FollowerService;
+use Cache;
+use DB;
+use Storage;
+use App\Follower;
+use App\Profile;
+
+class FollowServiceWarmCacheLargeIngestPipeline implements ShouldQueue
+{
+    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
+
+    public $profileId;
+    public $followType;
+    public $tries = 5;
+    public $timeout = 5000;
+    public $failOnTimeout = false;
+
+    /**
+     * Create a new job instance.
+     *
+     * @return void
+     */
+    public function __construct($profileId, $followType = 'following')
+    {
+        $this->profileId = $profileId;
+        $this->followType = $followType;
+    }
+
+    /**
+     * Execute the job.
+     *
+     * @return void
+     */
+    public function handle()
+    {
+        $pid = $this->profileId;
+        $type = $this->followType;
+
+        if($type === 'followers') {
+            $key = 'follow-warm-cache/' . $pid . '/followers.json';
+            if(!Storage::exists($key)) {
+                return;
+            }
+            $file = Storage::get($key);
+            $json = json_decode($file, true);
+
+            foreach($json as $id) {
+                FollowerService::add($id, $pid, false);
+                usleep(random_int(500, 3000));
+            }
+            sleep(5);
+            Storage::delete($key);
+        }
+
+        if($type === 'following') {
+            $key = 'follow-warm-cache/' . $pid . '/following.json';
+            if(!Storage::exists($key)) {
+                return;
+            }
+            $file = Storage::get($key);
+            $json = json_decode($file, true);
+
+            foreach($json as $id) {
+                FollowerService::add($pid, $id, false);
+                usleep(random_int(500, 3000));
+            }
+            sleep(5);
+            Storage::delete($key);
+        }
+
+        sleep(random_int(2, 5));
+        $files = Storage::files('follow-warm-cache/' . $pid);
+        if(empty($files)) {
+            Storage::deleteDirectory('follow-warm-cache/' . $pid);
+        }
+    }
+}