浏览代码

Update federation pipeline, add locks

Daniel Supernault 4 年之前
父节点
当前提交
ddc768871b
共有 3 个文件被更改,包括 75 次插入40 次删除
  1. 9 0
      app/Jobs/InboxPipeline/InboxValidator.php
  2. 9 0
      app/Jobs/InboxPipeline/InboxWorker.php
  3. 57 40
      app/Util/ActivityPub/Helpers.php

+ 9 - 0
app/Jobs/InboxPipeline/InboxValidator.php

@@ -53,6 +53,15 @@ class InboxValidator implements ShouldQueue
 
         $profile = Profile::whereNull('domain')->whereUsername($username)->first();
 
+        if(isset($payload['id'])) {
+            $lockKey = hash('sha256', $payload['id']);
+            if(Cache::get($lockKey) !== null) {
+                // Job processed already
+                return 1;
+            }
+            Cache::put($lockKey, 1, 300);
+        }
+
         if(!isset($headers['signature']) || !isset($headers['date'])) {
             return;
         }

+ 9 - 0
app/Jobs/InboxPipeline/InboxWorker.php

@@ -49,6 +49,15 @@ class InboxWorker implements ShouldQueue
         $headers = $this->headers;
         $payload = json_decode($this->payload, true, 8);
 
+        if(isset($payload['id'])) {
+            $lockKey = hash('sha256', $payload['id']);
+            if(Cache::get($lockKey) !== null) {
+                // Job processed already
+                return 1;
+            }
+            Cache::put($lockKey, 1, 300);
+        }
+
         if(!isset($headers['signature']) || !isset($headers['date'])) {
             return;
         }

+ 57 - 40
app/Util/ActivityPub/Helpers.php

@@ -346,27 +346,41 @@ class Helpers {
 			$reply_to = null;
 		}
 		$ts = is_array($res['published']) ? $res['published'][0] : $res['published'];
-		$status = DB::transaction(function() use($profile, $res, $url, $ts, $reply_to, $cw, $scope, $id) {
-			$status = new Status;
-			$status->profile_id = $profile->id;
-			$status->url = isset($res['url']) ? $res['url'] : $url;
-			$status->uri = isset($res['url']) ? $res['url'] : $url;
-			$status->object_url = $id;
-			$status->caption = strip_tags($res['content']);
-			$status->rendered = Purify::clean($res['content']);
-			$status->created_at = Carbon::parse($ts);
-			$status->in_reply_to_id = $reply_to;
-			$status->local = false;
-			$status->is_nsfw = $cw;
-			$status->scope = $scope;
-			$status->visibility = $scope;
-			$status->cw_summary = $cw == true && isset($res['summary']) ?
-				Purify::clean(strip_tags($res['summary'])) : null;
-			$status->save();
-			if($reply_to == null) {
-				self::importNoteAttachment($res, $status);
-			}
-			return $status;
+
+		$statusLockKey = 'helpers:status-lock:' . hash('sha256', $res['id']);
+		$status = Cache::lock($statusLockKey)
+			->get(function () use(
+				$profile, 
+				$res, 
+				$url, 
+				$ts, 
+				$reply_to, 
+				$cw, 
+				$scope, 
+				$id
+		) {
+			return DB::transaction(function() use($profile, $res, $url, $ts, $reply_to, $cw, $scope, $id) {
+				$status = new Status;
+				$status->profile_id = $profile->id;
+				$status->url = isset($res['url']) ? $res['url'] : $url;
+				$status->uri = isset($res['url']) ? $res['url'] : $url;
+				$status->object_url = $id;
+				$status->caption = strip_tags($res['content']);
+				$status->rendered = Purify::clean($res['content']);
+				$status->created_at = Carbon::parse($ts);
+				$status->in_reply_to_id = $reply_to;
+				$status->local = false;
+				$status->is_nsfw = $cw;
+				$status->scope = $scope;
+				$status->visibility = $scope;
+				$status->cw_summary = $cw == true && isset($res['summary']) ?
+					Purify::clean(strip_tags($res['summary'])) : null;
+				$status->save();
+				if($reply_to == null) {
+					self::importNoteAttachment($res, $status);
+				}
+				return $status;
+			});
 		});
 
 		return $status;
@@ -458,25 +472,28 @@ class Helpers {
 
 			$profile = Profile::whereRemoteUrl($res['id'])->first();
 			if(!$profile) {
-				$profile = DB::transaction(function() use($domain, $webfinger, $res, $runJobs) {
-					$profile = new Profile();
-					$profile->domain = strtolower($domain);
-					$profile->username = strtolower(Purify::clean($webfinger));
-					$profile->name = isset($res['name']) ? Purify::clean($res['name']) : 'user';
-					$profile->bio = isset($res['summary']) ? Purify::clean($res['summary']) : null;
-					$profile->sharedInbox = isset($res['endpoints']) && isset($res['endpoints']['sharedInbox']) ? $res['endpoints']['sharedInbox'] : null;
-					$profile->inbox_url = strtolower($res['inbox']);
-					$profile->outbox_url = strtolower($res['outbox']);
-					$profile->remote_url = strtolower($res['id']);
-					$profile->public_key = $res['publicKey']['publicKeyPem'];
-					$profile->key_id = $res['publicKey']['id'];
-					$profile->webfinger = strtolower(Purify::clean($webfinger));
-					$profile->last_fetched_at = now();
-					$profile->save();
-					if(config('pixelfed.cloud_storage') == true) {
-						RemoteAvatarFetch::dispatch($profile);
-					}
-					return $profile;
+				$profileLockKey = 'helpers:profile-lock:' . hash('sha256', $res['id']);
+				$profile = Cache::lock($profileLockKey)->get(function () use($domain, $webfinger, $res, $runJobs) {
+					return DB::transaction(function() use($domain, $webfinger, $res, $runJobs) {
+						$profile = new Profile();
+						$profile->domain = strtolower($domain);
+						$profile->username = strtolower(Purify::clean($webfinger));
+						$profile->name = isset($res['name']) ? Purify::clean($res['name']) : 'user';
+						$profile->bio = isset($res['summary']) ? Purify::clean($res['summary']) : null;
+						$profile->sharedInbox = isset($res['endpoints']) && isset($res['endpoints']['sharedInbox']) ? $res['endpoints']['sharedInbox'] : null;
+						$profile->inbox_url = strtolower($res['inbox']);
+						$profile->outbox_url = strtolower($res['outbox']);
+						$profile->remote_url = strtolower($res['id']);
+						$profile->public_key = $res['publicKey']['publicKeyPem'];
+						$profile->key_id = $res['publicKey']['id'];
+						$profile->webfinger = strtolower(Purify::clean($webfinger));
+						$profile->last_fetched_at = now();
+						$profile->save();
+						if(config('pixelfed.cloud_storage') == true) {
+							RemoteAvatarFetch::dispatch($profile);
+						}
+						return $profile;
+					});
 				});
 			} else {
 				// Update info after 24 hours