StoryIndexService.php 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668
  1. <?php
  2. namespace App\Services;
  3. use Illuminate\Support\Arr;
  4. use Illuminate\Support\Facades\DB;
  5. use Illuminate\Support\Facades\Log;
  6. use Illuminate\Support\Facades\Redis;
  7. use Illuminate\Support\Facades\Storage;
  8. class StoryIndexService
  9. {
  10. public const STORY_TTL = 86400;
  11. private const REBUILD_LOCK_TTL = 300;
  12. private function authorKey($authorId)
  13. {
  14. return "story:by_author:{$authorId}";
  15. }
  16. private function storyKey($storyId)
  17. {
  18. return "story:{$storyId}";
  19. }
  20. private function seenKey($viewer, $author)
  21. {
  22. return "story:seen:{$viewer}:{$author}";
  23. }
  24. private function rebuildLockKey()
  25. {
  26. return 'story:rebuilding';
  27. }
  28. /**
  29. * Safely convert Redis result to integer, handling both predis and phpredis
  30. */
  31. private function redisToInt($value): int
  32. {
  33. if (is_int($value)) {
  34. return $value;
  35. }
  36. if (is_string($value) || is_numeric($value)) {
  37. return (int) $value;
  38. }
  39. // Handle phpredis object returns
  40. if (is_object($value) && method_exists($value, '__toString')) {
  41. return (int) $value->__toString();
  42. }
  43. // Fallback for unexpected types
  44. return 0;
  45. }
  46. /**
  47. * Safely convert Redis result to boolean
  48. */
  49. private function redisToBool($value): bool
  50. {
  51. if (is_bool($value)) {
  52. return $value;
  53. }
  54. if (is_int($value)) {
  55. return $value > 0;
  56. }
  57. if (is_string($value)) {
  58. return $value === '1' || strtolower($value) === 'true';
  59. }
  60. // Handle phpredis object returns
  61. if (is_object($value) && method_exists($value, '__toString')) {
  62. $str = $value->__toString();
  63. return $str === '1' || strtolower($str) === 'true';
  64. }
  65. return false;
  66. }
  67. /**
  68. * Safely execute Redis commands that return integers
  69. */
  70. private function redisInt(callable $command): int
  71. {
  72. return $this->redisToInt($command());
  73. }
  74. /**
  75. * Safely execute Redis commands that return booleans
  76. */
  77. private function redisBool(callable $command): bool
  78. {
  79. return $this->redisToBool($command());
  80. }
  81. /**
  82. * Safely convert Redis result to array, handling both predis and phpredis
  83. */
  84. private function redisToArray($value): array
  85. {
  86. if (is_array($value)) {
  87. return $value;
  88. }
  89. if (is_null($value)) {
  90. return [];
  91. }
  92. // Handle phpredis object returns that might be iterable
  93. if (is_object($value)) {
  94. if (method_exists($value, 'toArray')) {
  95. return $value->toArray();
  96. }
  97. if ($value instanceof \Iterator || $value instanceof \IteratorAggregate) {
  98. return iterator_to_array($value);
  99. }
  100. if (method_exists($value, '__toString')) {
  101. $str = $value->__toString();
  102. return $str ? [$str] : [];
  103. }
  104. }
  105. // Handle single values
  106. if (is_string($value) || is_numeric($value)) {
  107. return [$value];
  108. }
  109. return [];
  110. }
  111. /**
  112. * Safely execute Redis commands that return arrays
  113. */
  114. private function redisArray(callable $command): array
  115. {
  116. return $this->redisToArray($command());
  117. }
  118. public function indexStory($story): void
  119. {
  120. if (! $story->active) {
  121. $this->removeStory($story->id, $story->profile_id);
  122. return;
  123. }
  124. $author = (string) $story->profile_id;
  125. $sid = (string) $story->id;
  126. $createdAt = $story->created_at;
  127. $score = $createdAt->getTimestamp();
  128. $expiresAt = $story->expires_at;
  129. $ttl = (int) max(0, $expiresAt->getTimestamp() - time());
  130. if ($ttl <= 0) {
  131. $this->removeStory($sid, $author);
  132. return;
  133. }
  134. $score = (float) $createdAt->getTimestamp();
  135. $duration = (int) ($story->duration ?? 0);
  136. $overlays = $story->story ? json_encode(data_get($story->story, 'overlays', [])) : '[]';
  137. $viewCount = (int) ($story->view_count ?? 0);
  138. $createdIso = $createdAt->toIso8601String();
  139. $type = $story->type;
  140. $path = $story->path;
  141. Redis::pipeline(function ($pipe) use (
  142. $author, $sid, $score, $ttl, $duration, $overlays, $viewCount, $createdIso, $type, $path
  143. ) {
  144. $keyStory = $this->storyKey($sid);
  145. $keyAuth = $this->authorKey($author);
  146. $pipe->hset($keyStory, 'id', $sid);
  147. $pipe->hset($keyStory, 'profile_id', (string) $author);
  148. $pipe->hset($keyStory, 'type', (string) $type);
  149. $pipe->hset($keyStory, 'path', (string) $path);
  150. $pipe->hset($keyStory, 'duration', (string) $duration);
  151. $pipe->hset($keyStory, 'overlays', $overlays);
  152. $pipe->hset($keyStory, 'created_at', $createdIso);
  153. $pipe->hset($keyStory, 'view_count', (string) $viewCount);
  154. $pipe->expire($keyStory, (int) $ttl);
  155. if (config('database.redis.client') === 'predis') {
  156. $pipe->zadd($keyAuth, [$sid => $score]);
  157. } else {
  158. $pipe->zadd($keyAuth, $score, $sid);
  159. }
  160. $pipe->sadd('story:active_authors', $author);
  161. $pipe->expire($keyAuth, (int) ($ttl + 3600));
  162. });
  163. }
  164. public function removeStory(string|int $storyId, string|int $authorId): void
  165. {
  166. $sid = (string) $storyId;
  167. $aid = (string) $authorId;
  168. Redis::pipeline(function ($pipe) use ($sid, $aid) {
  169. $pipe->zrem($this->authorKey($aid), $sid);
  170. $pipe->del($this->storyKey($sid));
  171. });
  172. if ($this->redisInt(fn () => Redis::zcard($this->authorKey($aid))) === 0) {
  173. Redis::srem('story:active_authors', $aid);
  174. }
  175. }
  176. public function markSeen(int $viewerId, int $authorId, int $storyId, \DateTimeInterface $storyCreatedAt): void
  177. {
  178. $key = $this->seenKey($viewerId, $authorId);
  179. Redis::sadd($key, (string) $storyId);
  180. $expiresAt = now()->parse($storyCreatedAt)->addSeconds(self::STORY_TTL);
  181. $secondsUntilExpiry = $expiresAt->getTimestamp() - time();
  182. $ttl = max(1, $secondsUntilExpiry);
  183. $currentTtl = $this->redisInt(fn () => Redis::ttl($key));
  184. if ($currentTtl < 0) {
  185. $currentTtl = 0;
  186. }
  187. $finalTtl = max($ttl, $currentTtl);
  188. Redis::expire($key, $finalTtl);
  189. }
  190. public function rebuildIndex(): array
  191. {
  192. $lockKey = $this->rebuildLockKey();
  193. $lockAcquired = Redis::set($lockKey, '1', 'EX', self::REBUILD_LOCK_TTL, 'NX');
  194. if (! $lockAcquired) {
  195. return ['status' => 'already_rebuilding', 'message' => 'Index rebuild already in progress'];
  196. }
  197. try {
  198. $stats = ['indexed' => 0, 'expired' => 0, 'errors' => 0, 'seen_rebuilt' => 0, 'seen_errors' => 0];
  199. $this->clearStoryCache();
  200. DB::table('stories')
  201. ->where('active', true)
  202. ->where('expires_at', '>', now())
  203. ->orderBy('profile_id')
  204. ->orderBy('created_at', 'desc')
  205. ->chunk(500, function ($stories) use (&$stats) {
  206. foreach ($stories as $story) {
  207. try {
  208. $storyObj = (object) [
  209. 'id' => $story->id,
  210. 'profile_id' => $story->profile_id,
  211. 'active' => $story->active,
  212. 'created_at' => now()->parse($story->created_at),
  213. 'expires_at' => now()->parse($story->expires_at),
  214. 'duration' => $story->duration,
  215. 'story' => $story->story ? json_decode($story->story, true) : null,
  216. 'view_count' => $story->view_count ?? 0,
  217. 'type' => $story->type,
  218. 'path' => $story->path,
  219. ];
  220. $this->indexStory($storyObj);
  221. $stats['indexed']++;
  222. } catch (\Throwable $e) {
  223. $stats['errors']++;
  224. if (config('app.dev_log')) {
  225. Log::error('Failed to index story during rebuild', [
  226. 'story_id' => $story->id,
  227. 'error' => $e->getMessage(),
  228. ]);
  229. }
  230. }
  231. }
  232. });
  233. $cutoffDate = now()->subHours(48);
  234. DB::table('story_views')
  235. ->join('stories', 'story_views.story_id', '=', 'stories.id')
  236. ->where('story_views.created_at', '>=', $cutoffDate)
  237. ->where('stories.active', true)
  238. ->where('stories.expires_at', '>', now())
  239. ->select(
  240. 'story_views.profile_id as viewer_id',
  241. 'story_views.story_id',
  242. 'stories.profile_id as author_id',
  243. 'stories.created_at as story_created_at',
  244. 'story_views.created_at as view_created_at'
  245. )
  246. ->orderBy('story_views.id')
  247. ->chunk(1000, function ($views) use (&$stats) {
  248. foreach ($views as $view) {
  249. try {
  250. $this->markSeen(
  251. (int) $view->viewer_id,
  252. (int) $view->author_id,
  253. (int) $view->story_id,
  254. now()->parse($view->story_created_at)
  255. );
  256. $stats['seen_rebuilt']++;
  257. } catch (\Throwable $e) {
  258. $stats['seen_errors']++;
  259. if (config('app.dev_log')) {
  260. Log::error('Failed to rebuild seen data during cache rebuild', [
  261. 'viewer_id' => $view->viewer_id,
  262. 'story_id' => $view->story_id,
  263. 'error' => $e->getMessage(),
  264. ]);
  265. }
  266. }
  267. }
  268. });
  269. return [
  270. 'status' => 'success',
  271. 'message' => 'Story index and seen data rebuilt successfully',
  272. 'stats' => $stats,
  273. ];
  274. } finally {
  275. Redis::del($lockKey);
  276. }
  277. }
  278. private function clearStoryCache(): void
  279. {
  280. $storyKeys = $this->redisArray(fn () => Redis::keys('story:*'));
  281. $storyKeys = array_filter($storyKeys, function ($key) {
  282. return ! str_contains($key, 'following:');
  283. });
  284. if (! empty($storyKeys)) {
  285. $chunks = array_chunk($storyKeys, 1000);
  286. foreach ($chunks as $chunk) {
  287. Redis::del(...$chunk);
  288. }
  289. }
  290. }
  291. private function ensureStoryCacheHealth(): bool
  292. {
  293. $activeCount = $this->redisInt(fn () => Redis::scard('story:active_authors'));
  294. if ($activeCount > 0) {
  295. return true;
  296. }
  297. $dbActiveCount = DB::table('stories')
  298. ->where('active', true)
  299. ->where('expires_at', '>', now())
  300. ->count();
  301. if ($dbActiveCount === 0) {
  302. return true;
  303. }
  304. if (config('app.dev_log')) {
  305. Log::info('Story cache appears empty, triggering rebuild', [
  306. 'db_active_stories' => $dbActiveCount,
  307. 'redis_active_authors' => $activeCount,
  308. ]);
  309. }
  310. $result = $this->rebuildIndex();
  311. return $result['status'] === 'success';
  312. }
  313. public function fetchCarouselNodes(int $viewerId, callable $profileHydrator): array
  314. {
  315. if (! $this->ensureStoryCacheHealth()) {
  316. if (config('app.dev_log')) {
  317. Log::warning('Failed to rebuild story cache, falling back to database query');
  318. }
  319. return $this->fetchCarouselNodesFromDatabase($viewerId, $profileHydrator);
  320. }
  321. $pid = (string) $viewerId;
  322. $opt = $this->withScoresOpt();
  323. if (! Redis::exists("following:{$pid}")) {
  324. $this->hydrateFollowingFromSql($viewerId);
  325. }
  326. $followingCount = $this->redisInt(fn () => Redis::scard("following:{$pid}"));
  327. $activeCount = $this->redisInt(fn () => Redis::scard('story:active_authors'));
  328. $authorIds = [];
  329. if ($followingCount > 1500) {
  330. $active = $this->redisArray(fn () => Redis::smembers('story:active_authors'));
  331. if ($active) {
  332. $results = Redis::pipeline(function ($pipe) use ($active, $pid) {
  333. foreach ($active as $aid) {
  334. $pipe->sismember("following:{$pid}", $aid);
  335. }
  336. });
  337. if (is_array($results) && count($results) === count($active)) {
  338. foreach ($active as $i => $aid) {
  339. if ($this->redisToBool($results[$i] ?? false)) {
  340. $authorIds[] = $aid;
  341. }
  342. }
  343. } else {
  344. $authorIds = array_filter($active, fn ($aid) => $this->redisBool(fn () => Redis::sismember("following:{$pid}", $aid)));
  345. }
  346. }
  347. } else {
  348. $authorIds = $this->redisArray(fn () => Redis::sinter("following:{$pid}", 'story:active_authors'));
  349. }
  350. if ($this->redisInt(fn () => Redis::zcard($this->authorKey($pid))) > 0) {
  351. array_unshift($authorIds, $pid);
  352. $authorIds = array_values(array_unique($authorIds));
  353. }
  354. if (empty($authorIds)) {
  355. return [];
  356. }
  357. $responses = $this->redisArray(fn () => Redis::pipeline(function ($pipe) use ($authorIds, $opt) {
  358. foreach ($authorIds as $aid) {
  359. $pipe->zrevrange("story:by_author:{$aid}", 0, -1, $opt);
  360. }
  361. }));
  362. $authorToStoryIds = [];
  363. $authorLatestTs = [];
  364. foreach ($authorIds as $i => $aid) {
  365. $withScores = $responses[$i] ?? [];
  366. // Ensure withScores is also an array
  367. $withScores = $this->redisToArray($withScores);
  368. $authorToStoryIds[$aid] = array_keys($withScores);
  369. $authorLatestTs[$aid] = $withScores ? (float) array_values($withScores)[0] : 0.0;
  370. }
  371. $allStoryIds = [];
  372. foreach ($authorToStoryIds as $list) {
  373. $allStoryIds = array_merge($allStoryIds, $list);
  374. }
  375. $allStoryIds = array_values(array_unique($allStoryIds));
  376. $storyMap = [];
  377. foreach ($allStoryIds as $sid) {
  378. $h = $this->redisArray(fn () => Redis::hgetall($this->storyKey($sid)));
  379. if (! empty($h)) {
  380. $storyMap[$sid] = $h;
  381. }
  382. }
  383. $seenCache = [];
  384. foreach ($authorIds as $aid) {
  385. $seenCache[$aid] = $this->redisArray(fn () => Redis::smembers($this->seenKey($viewerId, (int) $aid)));
  386. $seenCache[$aid] = array_flip($seenCache[$aid]);
  387. }
  388. // Ensure $authorIds is always an array before using array_map
  389. $authorIds = $this->redisToArray($authorIds);
  390. $profiles = $profileHydrator(array_map('intval', $authorIds));
  391. $nodes = [];
  392. foreach ($authorIds as $aid) {
  393. $profile = $profiles[(int) $aid] ?? null;
  394. if (! $profile) {
  395. continue;
  396. }
  397. $isAuthor = ((string) $profile['id'] === $pid);
  398. $storyItems = [];
  399. foreach (Arr::get($authorToStoryIds, $aid, []) as $sid) {
  400. $h = $storyMap[$sid] ?? null;
  401. if (! $h) {
  402. continue;
  403. }
  404. $durationMs = max(((int) $h['duration']) * 1000, 10000);
  405. $viewed = $isAuthor ? true : isset($seenCache[$aid][$sid]);
  406. $storyItems[] = [
  407. 'id' => (string) $sid,
  408. 'type' => $h['type'],
  409. 'url' => url(Storage::url($h['path'])),
  410. 'overlays' => json_decode($h['overlays'] ?? '[]', true) ?: [],
  411. 'duration' => $durationMs,
  412. 'viewed' => $viewed,
  413. 'created_at' => $h['created_at'],
  414. ...($isAuthor ? ['view_count' => (int) ($h['view_count'] ?? 0)] : []),
  415. ];
  416. }
  417. if (empty($storyItems)) {
  418. continue;
  419. }
  420. $url = ! empty($profile['local'])
  421. ? url("/stories/{$profile['username']}")
  422. : url("/i/rs/{$profile['id']}");
  423. $nodes[] = [
  424. 'id' => $isAuthor ? 'self:'.$profile['id'] : 'pfs:'.$profile['id'],
  425. 'username' => $profile['username'],
  426. 'username_acct' => $profile['acct'],
  427. 'profile_id' => (string) $profile['id'],
  428. 'avatar' => $profile['avatar'],
  429. 'is_author' => $isAuthor,
  430. 'stories' => collect($storyItems)->sortBy('id')->values()->all(),
  431. 'url' => $url,
  432. 'hasViewed' => collect($storyItems)->every(fn ($s) => $s['viewed'] === true),
  433. '_latest_ts' => $authorLatestTs[$aid] ?? 0,
  434. ];
  435. }
  436. usort($nodes, function ($a, $b) {
  437. if ($a['is_author'] && ! $b['is_author']) {
  438. return -1;
  439. }
  440. if (! $a['is_author'] && $b['is_author']) {
  441. return 1;
  442. }
  443. if ($a['hasViewed'] !== $b['hasViewed']) {
  444. return $a['hasViewed'] <=> $b['hasViewed'];
  445. }
  446. return ($b['_latest_ts'] ?? 0) <=> ($a['_latest_ts'] ?? 0);
  447. });
  448. foreach ($nodes as &$n) {
  449. unset($n['_latest_ts']);
  450. }
  451. return $nodes;
  452. }
  453. private function fetchCarouselNodesFromDatabase(int $viewerId, callable $profileHydrator): array
  454. {
  455. $following = DB::table('followers')
  456. ->where('profile_id', $viewerId)
  457. ->pluck('following_id')
  458. ->map(fn ($id) => (string) $id)
  459. ->toArray();
  460. $authorIds = array_merge([(string) $viewerId], $following);
  461. $stories = DB::table('stories')
  462. ->whereIn('profile_id', array_map('intval', $authorIds))
  463. ->where('active', true)
  464. ->where('expires_at', '>', now())
  465. ->orderBy('profile_id')
  466. ->orderBy('created_at', 'desc')
  467. ->get()
  468. ->groupBy('profile_id');
  469. if ($stories->isEmpty()) {
  470. return [];
  471. }
  472. $profiles = $profileHydrator(array_map('intval', $stories->keys()->toArray()));
  473. $nodes = [];
  474. foreach ($stories as $profileId => $profileStories) {
  475. $profile = $profiles[(int) $profileId] ?? null;
  476. if (! $profile) {
  477. continue;
  478. }
  479. $isAuthor = ((int) $profile['id'] === $viewerId);
  480. $storyItems = [];
  481. foreach ($profileStories as $story) {
  482. $durationMs = max(((int) $story->duration) * 1000, 10000);
  483. $storyItems[] = [
  484. 'id' => (string) $story->id,
  485. 'type' => $story->type,
  486. 'url' => url(Storage::url($story->path)),
  487. 'overlays' => json_decode($story->story ?? '{}', true)['overlays'] ?? [],
  488. 'duration' => $durationMs,
  489. 'viewed' => false,
  490. 'created_at' => $story->created_at,
  491. ...($isAuthor ? ['view_count' => (int) ($story->view_count ?? 0)] : []),
  492. ];
  493. }
  494. if (empty($storyItems)) {
  495. continue;
  496. }
  497. $url = ! empty($profile['local'])
  498. ? url("/stories/{$profile['username']}")
  499. : url("/i/rs/{$profile['id']}");
  500. $nodes[] = [
  501. 'id' => $isAuthor ? 'self:'.$profile['id'] : 'pfs:'.$profile['id'],
  502. 'username' => $profile['username'],
  503. 'username_acct' => $profile['acct'],
  504. 'profile_id' => (string) $profile['id'],
  505. 'avatar' => $profile['avatar'],
  506. 'is_author' => $isAuthor,
  507. 'stories' => collect($storyItems)->sortBy('id')->values()->all(),
  508. 'url' => $url,
  509. 'hasViewed' => false,
  510. '_latest_ts' => \Carbon\Carbon::parse($profileStories->first()->created_at)->timestamp,
  511. ];
  512. }
  513. usort($nodes, function ($a, $b) {
  514. if ($a['is_author'] && ! $b['is_author']) {
  515. return -1;
  516. }
  517. if (! $a['is_author'] && $b['is_author']) {
  518. return 1;
  519. }
  520. return ($b['_latest_ts'] ?? 0) <=> ($a['_latest_ts'] ?? 0);
  521. });
  522. foreach ($nodes as &$n) {
  523. unset($n['_latest_ts']);
  524. }
  525. return $nodes;
  526. }
  527. private function hydrateFollowingFromSql(int $viewerId): void
  528. {
  529. $followingKey = "following:{$viewerId}";
  530. $hasResults = false;
  531. DB::table('followers')
  532. ->where('profile_id', $viewerId)
  533. ->orderBy('id')
  534. ->chunk(1000, function ($followers) use ($followingKey, &$hasResults) {
  535. $hasResults = true;
  536. $ids = $followers->map(fn ($f) => (string) $f->following_id)->all();
  537. if (! empty($ids)) {
  538. Redis::sadd($followingKey, ...$ids);
  539. }
  540. });
  541. if (! $hasResults) {
  542. Redis::pipeline(function ($pipe) use ($followingKey) {
  543. $pipe->sadd($followingKey, '__empty__');
  544. $pipe->srem($followingKey, '__empty__');
  545. $pipe->expire($followingKey, 3600);
  546. });
  547. } else {
  548. Redis::expire($followingKey, 43200);
  549. }
  550. }
  551. private function withScoresOpt()
  552. {
  553. return config('database.redis.client') === 'predis' ? ['withscores' => true] : true;
  554. }
  555. }