|
@@ -4,12 +4,9 @@ namespace App\Jobs\MovePipeline;
|
|
|
|
|
|
use App\Follower;
|
|
use App\Follower;
|
|
use App\Util\ActivityPub\Helpers;
|
|
use App\Util\ActivityPub\Helpers;
|
|
-use App\Util\ActivityPub\HttpSignature;
|
|
|
|
use DateTime;
|
|
use DateTime;
|
|
use DB;
|
|
use DB;
|
|
use Exception;
|
|
use Exception;
|
|
-use GuzzleHttp\Client;
|
|
|
|
-use GuzzleHttp\Pool;
|
|
|
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
|
use Illuminate\Foundation\Queue\Queueable;
|
|
use Illuminate\Foundation\Queue\Queueable;
|
|
use Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis;
|
|
use Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis;
|
|
@@ -116,52 +113,19 @@ class MoveMigrateFollowersPipeline implements ShouldQueue
|
|
->whereNotNull('profiles.user_id')
|
|
->whereNotNull('profiles.user_id')
|
|
->whereNull('profiles.deleted_at')
|
|
->whereNull('profiles.deleted_at')
|
|
->select('profiles.id', 'profiles.user_id', 'profiles.username', 'profiles.private_key', 'profiles.status')
|
|
->select('profiles.id', 'profiles.user_id', 'profiles.username', 'profiles.private_key', 'profiles.status')
|
|
- ->chunkById(100, function ($followers) use ($addlHeaders, $targetInbox, $targetPid, $target) {
|
|
|
|
- $client = new Client([
|
|
|
|
- 'timeout' => config('federation.activitypub.delivery.timeout'),
|
|
|
|
- ]);
|
|
|
|
- $requests = function ($followers) use ($client, $target, $addlHeaders, $targetInbox, $targetPid) {
|
|
|
|
- $activity = [
|
|
|
|
- '@context' => 'https://www.w3.org/ns/activitystreams',
|
|
|
|
- 'type' => 'Follow',
|
|
|
|
- 'actor' => null,
|
|
|
|
- 'object' => $target,
|
|
|
|
- ];
|
|
|
|
- foreach ($followers as $follower) {
|
|
|
|
- if (! $follower->private_key || ! $follower->username || ! $follower->user_id || $follower->status === 'delete') {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- $permalink = 'https://'.config('pixelfed.domain.app').'/users/'.$follower->username;
|
|
|
|
- $activity['actor'] = $permalink;
|
|
|
|
- $keyId = $permalink.'#main-key';
|
|
|
|
- $payload = json_encode($activity);
|
|
|
|
- $url = $targetInbox;
|
|
|
|
- $headers = HttpSignature::signRaw($follower->private_key, $keyId, $targetInbox, $activity, $addlHeaders);
|
|
|
|
- Follower::updateOrCreate([
|
|
|
|
- 'profile_id' => $follower->id,
|
|
|
|
- 'following_id' => $targetPid,
|
|
|
|
- ]);
|
|
|
|
- yield function () use ($client, $url, $headers, $payload) {
|
|
|
|
- return $client->postAsync($url, [
|
|
|
|
- 'curl' => [
|
|
|
|
- CURLOPT_HTTPHEADER => $headers,
|
|
|
|
- CURLOPT_POSTFIELDS => $payload,
|
|
|
|
- CURLOPT_HEADER => true,
|
|
|
|
- ],
|
|
|
|
- ]);
|
|
|
|
- };
|
|
|
|
|
|
+ ->chunkById(100, function ($followers) use ($targetInbox, $targetPid, $target) {
|
|
|
|
+ foreach ($followers as $follower) {
|
|
|
|
+ if (! $follower->private_key || ! $follower->username || ! $follower->user_id || $follower->status === 'delete') {
|
|
|
|
+ continue;
|
|
}
|
|
}
|
|
- };
|
|
|
|
|
|
|
|
- $pool = new Pool($client, $requests($followers), [
|
|
|
|
- 'concurrency' => config('federation.activitypub.delivery.concurrency'),
|
|
|
|
- 'fulfilled' => function ($response, $index) {},
|
|
|
|
- 'rejected' => function ($reason, $index) {},
|
|
|
|
- ]);
|
|
|
|
|
|
+ Follower::updateOrCreate([
|
|
|
|
+ 'profile_id' => $follower->id,
|
|
|
|
+ 'following_id' => $targetPid,
|
|
|
|
+ ]);
|
|
|
|
|
|
- $promise = $pool->promise();
|
|
|
|
-
|
|
|
|
- $promise->wait();
|
|
|
|
|
|
+ MoveSendFollowPipeline::dispatch($follower, $targetInbox, $targetPid, $target)->onQueue('follow');
|
|
|
|
+ }
|
|
}, 'id');
|
|
}, 'id');
|
|
}
|
|
}
|
|
}
|
|
}
|