Browse Source

added long-polling to make updates appear instantly. This is probably overkill

Bernn 1 tuần trước cách đây
mục cha
commit
3036a1abf4
5 tập tin đã thay đổi với 182 bổ sung2 xóa
  1. 9 1
      PDQ.php
  2. 55 0
      assets/app.js
  3. 97 0
      bin/events_poll.php
  4. 16 0
      lib/render.php
  5. 5 1
      vendor.php

+ 9 - 1
PDQ.php

@@ -4,6 +4,12 @@ require_once __DIR__ . '/lib/render.php';
 
 $actor = current_actor('ICG');
 $vendors = all_vendors();
+
+$initialMessageMaxIds = [];
+foreach ($vendors as $v) {
+    $initialMessageMaxIds[(int) $v['id']] = max_message_id((int) $v['id']);
+}
+$initialHistoryId = max_history_id();
 ?><!doctype html>
 <html lang="en">
 <head>
@@ -15,7 +21,9 @@ window.PDQ = {
     actor: 'ICG',
     audience: 'ICG',
     vendors: <?= json_encode(array_map(fn($v) => ['slug' => $v['slug'], 'name' => $v['name']], $vendors)) ?>,
-    pollMs: 60000
+    pollMs: 60000,
+    initialHistoryId: <?= (int) $initialHistoryId ?>,
+    initialMessageMaxIds: <?= json_encode((object) $initialMessageMaxIds) ?>
 };
 </script>
 <script src="assets/app.js" defer></script>

+ 55 - 0
assets/app.js

@@ -373,11 +373,66 @@
     reloadAllThreads();
   }
 
+  // -------- long-poll (real-time push) --------
+  //
+  // The server holds the connection open until job_history or messages get
+  // a row beyond our since markers, then immediately tells us what changed.
+  // We reload only the affected piece. The setInterval timer below remains
+  // as a safety net for transient network/server failures.
+
+  let lpHistoryId = (PDQ.initialHistoryId | 0) || 0;
+  let lpMessageMaxIds = Object.assign({}, PDQ.initialMessageMaxIds || {});
+  let lpAbort = null;
+  let lpStopped = false;
+
+  function buildSinceM() {
+    if (PDQ.audience === 'vendor') {
+      return String(lpMessageMaxIds[PDQ.vendorId] || 0);
+    }
+    return Object.entries(lpMessageMaxIds).map(([k, v]) => k + ':' + v).join(',');
+  }
+
+  function startLongPoll() {
+    if (lpStopped) return;
+    lpAbort = new AbortController();
+    const url = new URL('bin/events_poll.php', window.location.href);
+    Object.entries(authParams()).forEach(([k, v]) => url.searchParams.set(k, v));
+    url.searchParams.set('since_h', String(lpHistoryId));
+    url.searchParams.set('since_m', buildSinceM());
+
+    fetch(url.toString(), { signal: lpAbort.signal })
+      .then(r => r.json())
+      .then(data => {
+        // Always update the since markers so the long-poll moves forward and
+        // doesn't re-fire the same event in a tight loop.
+        if (typeof data.history === 'number') lpHistoryId = data.history;
+        if (data.messages && typeof data.messages === 'object') {
+          Object.entries(data.messages).forEach(([vid, maxid]) => {
+            lpMessageMaxIds[vid] = maxid;
+          });
+        }
+        // Skip the actual reload if the user is editing — would tear out
+        // their input. The 60s tick() and the next change will catch them
+        // up once they're done.
+        const busy = isUserBusy() || composeRowId !== null;
+        if (!busy && data.history_changed) reloadTable();
+        if (!busy && data.changed_vendors && data.changed_vendors.length) reloadAllThreads();
+        startLongPoll(); // immediately reopen
+      })
+      .catch(err => {
+        if (err.name === 'AbortError') return;
+        // Network blip / server hiccup: back off briefly and retry. The 60s
+        // tick() will keep things fresh in the meantime.
+        setTimeout(startLongPoll, 2000);
+      });
+  }
+
   document.addEventListener('DOMContentLoaded', () => {
     wireAddJob();
     wireCompose();
     reloadTable();
     reloadAllThreads();
+    startLongPoll();
     const interval = PDQ.pollMs || 60000;
     setInterval(tick, interval);
   });

+ 97 - 0
bin/events_poll.php

@@ -0,0 +1,97 @@
+<?php
+// Long-poll endpoint. Holds the connection open for up to ~28 seconds until
+// either (a) the job_history or messages tables receive a row beyond the
+// caller's since markers, or (b) it times out. Either way returns the current
+// max IDs so the client can update its trackers.
+//
+// One PHP-FPM worker is parked per connected client while this runs. For 2-3
+// users that's fine; if more users start using this app, raise pm.max_children.
+
+require_once __DIR__ . '/../lib/identity.php';
+
+[$actor, $vendor_id] = resolve_request_actor();
+
+$since_h = (int) ($_GET['since_h'] ?? 0);
+
+// Messages "since" markers.
+//   Vendor caller: a single int (their own vendor's last seen message id).
+//   ICG caller:    "vid:id,vid:id" pairs (one per vendor thread on PDQ.php).
+$since_m = [];
+if ($actor === 'ICG') {
+    foreach (array_filter(explode(',', (string) ($_GET['since_m'] ?? ''))) as $pair) {
+        $p = explode(':', $pair);
+        if (count($p) === 2) $since_m[(int) $p[0]] = (int) $p[1];
+    }
+} else {
+    $since_m[$vendor_id] = (int) ($_GET['since_m'] ?? 0);
+}
+
+// PHP's default max_execution_time is 30s; give ourselves slightly more so the
+// loop can finish writing the response before the deadline.
+@set_time_limit(45);
+
+$pdo = db();
+$vfilter = $actor === 'ICG' ? null : $vendor_id;
+$deadline = microtime(true) + 28.0;
+
+header('Content-Type: application/json');
+
+while (true) {
+    [$cur_h, $cur_m] = events_snapshot($pdo, $vfilter);
+
+    $history_changed = $cur_h > $since_h;
+    $changed_vendors = [];
+    foreach ($cur_m as $vid => $maxid) {
+        if ($maxid > ($since_m[$vid] ?? 0)) $changed_vendors[] = $vid;
+    }
+
+    if ($history_changed || $changed_vendors) {
+        echo json_encode([
+            'history'          => $cur_h,
+            'messages'         => $cur_m,
+            'history_changed'  => $history_changed,
+            'changed_vendors'  => $changed_vendors,
+        ]);
+        return;
+    }
+    if (microtime(true) >= $deadline) {
+        echo json_encode([
+            'history'  => $cur_h,
+            'messages' => $cur_m,
+            'timeout'  => true,
+        ]);
+        return;
+    }
+    // Bail early if the client has gone away (cheap check — flush updates the
+    // connection state). Otherwise sleep 1s and loop.
+    if (connection_aborted()) return;
+    usleep(1000000);
+}
+
+function events_snapshot(PDO $pdo, ?int $vendor_filter): array {
+    if ($vendor_filter === null) {
+        $h = (int) $pdo->query('SELECT COALESCE(MAX(id), 0) FROM job_history')->fetchColumn();
+        $m = [];
+        foreach ($pdo->query('SELECT vendor_id, COALESCE(MAX(id), 0) AS maxid FROM messages GROUP BY vendor_id')->fetchAll() as $r) {
+            $m[(int) $r['vendor_id']] = (int) $r['maxid'];
+        }
+        // Vendors with zero messages get a 0 entry so the client tracks them.
+        foreach ($pdo->query('SELECT id FROM vendors WHERE active = 1')->fetchAll() as $v) {
+            if (!isset($m[(int) $v['id']])) $m[(int) $v['id']] = 0;
+        }
+    } else {
+        $stmt = $pdo->prepare(
+            'SELECT COALESCE(MAX(h.id), 0)
+               FROM job_history h
+               JOIN jobs j ON j.id = h.job_id
+              WHERE j.vendor_id = ?'
+        );
+        $stmt->execute([$vendor_filter]);
+        $h = (int) $stmt->fetchColumn();
+
+        $stmt = $pdo->prepare('SELECT COALESCE(MAX(id), 0) FROM messages WHERE vendor_id = ?');
+        $stmt->execute([$vendor_filter]);
+        $m = [$vendor_filter => (int) $stmt->fetchColumn()];
+    }
+    return [$h, $m];
+}

+ 16 - 0
lib/render.php

@@ -193,3 +193,19 @@ function max_message_id(int $vendor_id): int {
     $stmt->execute([$vendor_id]);
     return (int) $stmt->fetchColumn();
 }
+
+// Scoped to a vendor's jobs when $vendor_id is given (so a vendor doesn't get
+// woken up by another vendor's history). ICG callers pass null for global max.
+function max_history_id(?int $vendor_id = null): int {
+    if ($vendor_id === null) {
+        return (int) db()->query('SELECT COALESCE(MAX(id), 0) FROM job_history')->fetchColumn();
+    }
+    $stmt = db()->prepare(
+        'SELECT COALESCE(MAX(h.id), 0)
+           FROM job_history h
+           JOIN jobs j ON j.id = h.job_id
+          WHERE j.vendor_id = ?'
+    );
+    $stmt->execute([$vendor_id]);
+    return (int) $stmt->fetchColumn();
+}

+ 5 - 1
vendor.php

@@ -6,6 +6,7 @@ $slug = $_GET['v'] ?? '';
 $vendor = current_actor_from_vendor($slug);
 $vid = (int) $vendor['id'];
 $maxId = max_message_id($vid);
+$initialHistoryId = max_history_id($vid);
 ?><!doctype html>
 <html lang="en">
 <head>
@@ -16,8 +17,11 @@ $maxId = max_message_id($vid);
 window.PDQ = {
     actor: <?= json_encode($vendor['slug']) ?>,
     audience: 'vendor',
+    vendorId: <?= (int) $vid ?>,
     vendors: [<?= json_encode(['slug' => $vendor['slug'], 'name' => $vendor['name']]) ?>],
-    pollMs: 60000
+    pollMs: 60000,
+    initialHistoryId: <?= (int) $initialHistoryId ?>,
+    initialMessageMaxIds: <?= json_encode((object) [$vid => $maxId]) ?>
 };
 </script>
 <script src="assets/app.js" defer></script>