events_poll.php 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. <?php
  2. // Long-poll endpoint. Holds the connection open for up to ~28 seconds until
  3. // either (a) the job_history or messages tables receive a row beyond the
  4. // caller's since markers, or (b) it times out. Either way returns the current
  5. // max IDs so the client can update its trackers.
  6. //
  7. // One PHP-FPM worker is parked per connected client while this runs. For 2-3
  8. // users that's fine; if more users start using this app, raise pm.max_children.
  9. require_once __DIR__ . '/../lib/identity.php';
  10. [$actor, $vendor_id] = resolve_request_actor();
  11. $since_h = (int) ($_GET['since_h'] ?? 0);
  12. // Messages "since" markers.
  13. // Vendor caller: a single int (their own vendor's last seen message id).
  14. // ICG caller: "vid:id,vid:id" pairs (one per vendor thread on PDQ.php).
  15. $since_m = [];
  16. if ($actor === 'ICG') {
  17. foreach (array_filter(explode(',', (string) ($_GET['since_m'] ?? ''))) as $pair) {
  18. $p = explode(':', $pair);
  19. if (count($p) === 2) $since_m[(int) $p[0]] = (int) $p[1];
  20. }
  21. } else {
  22. $since_m[$vendor_id] = (int) ($_GET['since_m'] ?? 0);
  23. }
  24. // PHP's default max_execution_time is 30s; give ourselves slightly more so the
  25. // loop can finish writing the response before the deadline.
  26. @set_time_limit(45);
  27. $pdo = db();
  28. $vfilter = $actor === 'ICG' ? null : $vendor_id;
  29. $deadline = microtime(true) + 28.0;
  30. header('Content-Type: application/json');
  31. while (true) {
  32. [$cur_h, $cur_m] = events_snapshot($pdo, $vfilter);
  33. $history_changed = $cur_h > $since_h;
  34. $changed_vendors = [];
  35. foreach ($cur_m as $vid => $maxid) {
  36. if ($maxid > ($since_m[$vid] ?? 0)) $changed_vendors[] = $vid;
  37. }
  38. if ($history_changed || $changed_vendors) {
  39. echo json_encode([
  40. 'history' => $cur_h,
  41. 'messages' => $cur_m,
  42. 'history_changed' => $history_changed,
  43. 'changed_vendors' => $changed_vendors,
  44. ]);
  45. return;
  46. }
  47. if (microtime(true) >= $deadline) {
  48. echo json_encode([
  49. 'history' => $cur_h,
  50. 'messages' => $cur_m,
  51. 'timeout' => true,
  52. ]);
  53. return;
  54. }
  55. // Bail early if the client has gone away (cheap check — flush updates the
  56. // connection state). Otherwise sleep 1s and loop.
  57. if (connection_aborted()) return;
  58. usleep(1000000);
  59. }
  60. function events_snapshot(PDO $pdo, ?int $vendor_filter): array {
  61. if ($vendor_filter === null) {
  62. $h = (int) $pdo->query('SELECT COALESCE(MAX(id), 0) FROM job_history')->fetchColumn();
  63. $m = [];
  64. foreach ($pdo->query('SELECT vendor_id, COALESCE(MAX(id), 0) AS maxid FROM messages GROUP BY vendor_id')->fetchAll() as $r) {
  65. $m[(int) $r['vendor_id']] = (int) $r['maxid'];
  66. }
  67. // Vendors with zero messages get a 0 entry so the client tracks them.
  68. foreach ($pdo->query('SELECT id FROM vendors WHERE active = 1')->fetchAll() as $v) {
  69. if (!isset($m[(int) $v['id']])) $m[(int) $v['id']] = 0;
  70. }
  71. } else {
  72. $stmt = $pdo->prepare(
  73. 'SELECT COALESCE(MAX(h.id), 0)
  74. FROM job_history h
  75. JOIN jobs j ON j.id = h.job_id
  76. WHERE j.vendor_id = ?'
  77. );
  78. $stmt->execute([$vendor_filter]);
  79. $h = (int) $stmt->fetchColumn();
  80. $stmt = $pdo->prepare('SELECT COALESCE(MAX(id), 0) FROM messages WHERE vendor_id = ?');
  81. $stmt->execute([$vendor_filter]);
  82. $m = [$vendor_filter => (int) $stmt->fetchColumn()];
  83. }
  84. return [$h, $m];
  85. }