chundev
日期:2026-04-22 情境: 一套鑑識分析平台同時跑 PG、Elasticsearch、Neo4j 三個 store。本文整理這三者的分工邊界、資料同步 pattern、worker 競爭機制與常見的 race condition 處理。
三個 store 各自擅長的事:PG 做 ACID、ES 做全文/向量檢索、Neo4j 做多跳關係。實戰設計必須堅守「PG 是唯一 source of truth,其他都是衍生資料」— 衍生 store 隨時可以從 PG 重建。同步靠多個 worker 搶 PENDING 任務(FOR UPDATE SKIP LOCKED),用 per-entity 狀態欄位追蹤每個衍生資料的同步進度。
PG 本身很強大 — JSONB、GIN index、pgvector、ltree、full text search (tsvector)。是不是全塞 PG 就好?
實戰上試過,撞到以下瓶頸:
| 場景 | PG 做法 | 痛點 |
|---|---|---|
| 跨中文全文檢索 1000 萬筆訊息 | tsvector + GIN |
查詢 2-5 秒,無 BM25 相關性排序,分頁 offset 深時性能崩潰 |
| 向量相似度搜尋 | pgvector <-> operator |
IVFFlat index 在 1000 萬筆以上重建要半小時,hybrid query 麻煩 |
| 6-hop 關係查詢(中間人分析) | 5-6 個 self-join | Optimizer 走死,跑 30 分鐘 timeout |
| 跨案碰撞(2000 萬筆 party × N 證物) | join + UNION + EXISTS | 計畫複雜度爆炸,記憶體 OOM |
| 地理點範圍查詢 | PostGIS | OK 但跟鑑識 workflow 整合複雜 |
結論:每種 store 都有它最擅長的查詢模式,試圖全塞一個會踩所有坑。
該鑑識分析平台的 store 分工:
graph TB
PG["PostgreSQL (source of truth)
所有解析 / 業務 / 使用者資料
ACID + FK
備份核心目標
(ParsedMessage / ParsedCallLog / ...)"]
ES["Elasticsearch (檢索)
全文搜尋 (BM25)
向量相似度 (HNSW)
Hybrid search
5 個 index"]
NEO["Neo4j (關係)
多跳 traversal
跨案碰撞偵測
中間人 path finding
5 node label + 7 種 rel"]
PG -->|sync worker| ES
PG -->|sync worker| NEO
style PG fill:#dbeafe,stroke:#1e40af,stroke-width:2px
style ES fill:#fef3c7,stroke:#92400e,stroke-width:2px
style NEO fill:#dcfce7,stroke:#166534,stroke-width:2px
衍生資料 — 隨時可 drop + rebuild。
ES 和 Neo4j 的資料都能從 PG 重建。實戰價值:
pnpm elk 重跑把 PG 全量同步。Neo4j 同理。備份只需要 PG dump + 不可重建的應用檔案(報告草稿等)。規則:任何寫入都先落 PG,再經 worker 推到 ES/Neo4j。前端 API 讀取時兩個 store 並用(讀最適合的)。
直覺會想用 CDC (Debezium / PG logical replication) 做一條 PG → ES/Neo4j 的 pipeline。實戰選擇是多個專用 worker 競爭認領任務:
PG ExhibitDataFile 表的 status 欄位(每個 worker 認領一類):
xmlStatus / elkAccountsStatus / elkCallsStatus / elkConversationsStatus / elkMessagesStatus / elkLocationsStatus / neo4jStatus / embeddingStatus
graph TB
PARSE["parse-worker
讀 XML → 寫 ParsedXxx
設 xmlStatus = COMPLETED"]
ELKP["elk-priority-worker (1-3 副本)
小資料量 status
(accounts / calls / conversations)"]
NEO["neo4j-worker (1-2 副本)
處理 neo4jStatus"]
ELKD["elk-data-worker (1-3 副本)
大資料量 status
(messages / locations)"]
EMB["embedding-worker (1-5 副本)
讀空向量 → 呼叫 API
→ 寫回 ES embedding"]
PARSE --> ELKP
PARSE --> NEO
PARSE --> ELKD
ELKD --> EMB
style PARSE fill:#fef3c7,stroke:#92400e,stroke-width:2px
style ELKP fill:#dbeafe,stroke:#1e40af
style ELKD fill:#dbeafe,stroke:#1e40af
style EMB fill:#fce7f3,stroke:#9f1239
style NEO fill:#dcfce7,stroke:#166534
| 優點 | 說明 |
|---|---|
| 平行度可調 | embedding-worker 是 GPU bound,拉 5 副本。neo4j-worker 是 write bound,1 個就夠。 |
| 部分失敗不 cascade | embedding API 掛了只影響向量,ES 全文搜尋 / Neo4j 關係都還能查。 |
| 優先級分化 | priority worker 先搞定「基本事實」(帳號 / 通話),大資料量的 messages 同步慢沒關係。 |
| 重試獨立 | 某個 entity 的 embeddingStatus=FAILED 不會卡住其他 status。 |
多副本 worker 會搶同一個 pending task。必須用 DB-level atomic claim 避免衝突:
-- worker 每個 poll cycle 跑一次
SELECT id FROM ExhibitDataFile
WHERE elkMessagesStatus = 'PENDING'
AND xmlStatus = 'COMPLETED' -- 依賴前面 stage 完成
ORDER BY createdAt ASC
LIMIT 1
FOR UPDATE SKIP LOCKED; -- ⭐ 關鍵
FOR UPDATE SKIP LOCKED 的語意:
FOR UPDATE:取得 row lockSKIP LOCKED:如果這 row 被別的 transaction 鎖住了,跳過它,不等待多個 worker 同時跑這條 query,PG 保證每個 worker 拿到不同的 row。搶到後:
UPDATE ExhibitDataFile
SET elkMessagesStatus = 'PROCESSING',
elkMessagesClaimedBy = 'worker-abc-123',
elkMessagesStartedAt = now()
WHERE id = $claimedId;
COMMIT;
Commit 後 lock 釋放,但 status 已經從 PENDING 變 PROCESSING — 其他 worker 不會再看到這筆。
這 pattern 比 Redis-based queue(如 BullMQ)簡單:不需要多一個元件、不需要處理 worker crash 後的 task reclaim — PG 的 status 欄位天然是單一事實。
Worker 掛了 / OOM 被 K8s 殺掉,status 會卡在 PROCESSING。對策:
-- 超過 30 分鐘還在 PROCESSING 的 → 視為 stuck,重設回 PENDING
UPDATE ExhibitDataFile
SET elkMessagesStatus = 'PENDING',
elkMessagesClaimedBy = null
WHERE elkMessagesStatus = 'PROCESSING'
AND elkMessagesStartedAt < now() - interval '30 minutes';
這個 query 由一個獨立的 cleanup CronJob 每 10 分鐘跑一次。實戰上 99% 的 stuck 都是 worker 被 K8s evict,重設後下個 poll cycle 會重跑。
跟 RDBMS 不同,ES 和 Neo4j 的資料跟 PG 之間有 eventual consistency 差距。
// ❌ 錯誤 pattern
await db.parsedMessage.create({ data: newMsg });
const result = await elkClient.search({ // 可能 miss
index: "messages",
query: { match: { body: newMsg.body } },
});
// result 是空的!因為 worker 還沒跑到
解法:
實戰選第 2 個:業務可接受延遲,且 worker 吞吐夠(10s refresh interval)。
Worker 跑完後,自動檢查「PG 應有的筆數」vs「ES 實際有的筆數」:
async function verifyElkSync(exhibitId: string) {
const pgCount = await db.parsedMessage.count({ where: { exhibitId } });
const esResponse = await elkClient.count({
index: "messages",
query: { term: { pg_exhibit_id: exhibitId } },
});
const esCount = esResponse.count;
if (pgCount !== esCount) {
console.warn(`差距: PG=${pgCount}, ES=${esCount}`);
await syncMissing(exhibitId); // 自動補
}
}
這類 reconciliation job 是衍生資料 pipeline 的必需品。實戰上會抓到 1-2% 的漏同步(通常是 worker 中途 crash)。
典型 UI 操作:「找出這個證物裡,提到『匯款』且跟甲乙丙互動過的訊息」 — 需要 ES + Neo4j + PG 三方出力。
// Step 1: ES 查包含「匯款」的訊息 ID
const esHits = await elkClient.search({
index: "messages",
query: {
bool: {
must: [{ match: { body: "匯款" } }],
filter: [{ term: { pg_exhibit_id: exhibitId } }],
},
},
size: 1000,
});
const messageIds = esHits.hits.hits.map(h => h._source.pg_id);
const senderPartyIds = esHits.hits.hits
.map(h => h._source.sender.pg_party_id)
.filter(Boolean);
// Step 2: Neo4j 查這些 sender 跟甲乙丙有沒有互動
const neo4jResult = await session.run(`
MATCH (s:Party)-[:PARTICIPATES_IN|PARTICIPATES_IN_CALL]->(i)
<-[:PARTICIPATES_IN|PARTICIPATES_IN_CALL]-(target:Party)
WHERE s.pg_id IN $senderIds
AND target.pg_id IN $targetIds
RETURN DISTINCT s.pg_id AS sender_id
`, { senderIds: senderPartyIds, targetIds: ["甲", "乙", "丙"] });
const validSenderIds = new Set(neo4jResult.records.map(r => r.get("sender_id")));
// Step 3: 過濾 ES 結果
const finalMessages = esHits.hits.hits.filter(h =>
validSenderIds.has(h._source.sender.pg_party_id)
);
選 ES 先的理由:全文搜尋容易先把候選集縮小到幾百筆,再丟 Neo4j 查就快。
// Step 1: Neo4j 找所有跟甲乙丙互動過的 party
const partiesResult = await session.run(`
MATCH (p:Party)-[:PARTICIPATES_IN|PARTICIPATES_IN_CALL]->(i)
<-[:PARTICIPATES_IN|PARTICIPATES_IN_CALL]-(target:Party)
WHERE target.pg_id IN $targetIds
RETURN DISTINCT p.pg_id AS party_id
`, { targetIds });
// Step 2: ES 用 term filter 縮到這些 party 的訊息
const esHits = await elkClient.search({
index: "messages",
query: {
bool: {
must: [{ match: { body: "匯款" } }],
filter: [
{ term: { pg_exhibit_id: exhibitId } },
{ terms: { "sender.pg_party_id": partyIds } },
],
},
},
});
選 Neo4j 先的理由:關係條件很嚴格時(e.g. 3-hop),先用 graph 把候選集縮小。
先走 selectivity 高的那邊。如果「匯款」只命中 1000 筆訊息,ES 先。如果「跟甲乙丙互動過的 party」只有 50 個,Neo4j 先。實戰寫查詢時要 EXPLAIN / profile 確認走對順序。
status 欄位 + FOR UPDATE SKIP LOCKED 對很多場景更簡單、更可觀察、失敗更容易重試。這個 pattern 的 overhead 很高:
如果業務查詢 pattern 簡單(CRUD + 少量 reporting)、資料量 < 1000 萬筆、沒有多跳關係 / 全文檢索 / 向量搜尋其中一項,PG + 適當 index 就好。不要為了炫技上三套。
實戰判斷標準:在 PG 上跑 EXPLAIN ANALYZE 看你的核心查詢,超過 1 秒且沒辦法優化、資料量還會增長 10×以上,才開始考慮加第二個 store。