Share Notes

chundev

View the Project on GitHub latteouka/share-notes

Polyglot Persistence 實戰:PostgreSQL + Elasticsearch + Neo4j 三 store 協作架構

日期:2026-04-22 情境: 一套鑑識分析平台同時跑 PG、Elasticsearch、Neo4j 三個 store。本文整理這三者的分工邊界、資料同步 pattern、worker 競爭機制與常見的 race condition 處理。


TL;DR

三個 store 各自擅長的事:PG 做 ACID、ES 做全文/向量檢索、Neo4j 做多跳關係。實戰設計必須堅守「PG 是唯一 source of truth,其他都是衍生資料」— 衍生 store 隨時可以從 PG 重建。同步靠多個 worker 搶 PENDING 任務(FOR UPDATE SKIP LOCKED),用 per-entity 狀態欄位追蹤每個衍生資料的同步進度。


一、為什麼不全塞 PostgreSQL

PG 本身很強大 — JSONB、GIN index、pgvector、ltree、full text search (tsvector)。是不是全塞 PG 就好?

實戰上試過,撞到以下瓶頸:

場景 PG 做法 痛點
跨中文全文檢索 1000 萬筆訊息 tsvector + GIN 查詢 2-5 秒,無 BM25 相關性排序,分頁 offset 深時性能崩潰
向量相似度搜尋 pgvector <-> operator IVFFlat index 在 1000 萬筆以上重建要半小時,hybrid query 麻煩
6-hop 關係查詢(中間人分析) 5-6 個 self-join Optimizer 走死,跑 30 分鐘 timeout
跨案碰撞(2000 萬筆 party × N 證物) join + UNION + EXISTS 計畫複雜度爆炸,記憶體 OOM
地理點範圍查詢 PostGIS OK 但跟鑑識 workflow 整合複雜

結論:每種 store 都有它最擅長的查詢模式,試圖全塞一個會踩所有坑


二、責任分工

該鑑識分析平台的 store 分工:

graph TB
    PG["PostgreSQL (source of truth)
所有解析 / 業務 / 使用者資料
ACID + FK
備份核心目標
(ParsedMessage / ParsedCallLog / ...)"] ES["Elasticsearch (檢索)
全文搜尋 (BM25)
向量相似度 (HNSW)
Hybrid search
5 個 index"] NEO["Neo4j (關係)
多跳 traversal
跨案碰撞偵測
中間人 path finding
5 node label + 7 種 rel"] PG -->|sync worker| ES PG -->|sync worker| NEO style PG fill:#dbeafe,stroke:#1e40af,stroke-width:2px style ES fill:#fef3c7,stroke:#92400e,stroke-width:2px style NEO fill:#dcfce7,stroke:#166534,stroke-width:2px

衍生資料 — 隨時可 drop + rebuild。

2.1 衍生資料的哲學

ES 和 Neo4j 的資料都能從 PG 重建。實戰價值:

規則:任何寫入都先落 PG,再經 worker 推到 ES/Neo4j。前端 API 讀取時兩個 store 並用(讀最適合的)。


三、Worker 架構:分化 + 競爭認領

3.1 不是一條 pipeline,是三層 worker

直覺會想用 CDC (Debezium / PG logical replication) 做一條 PG → ES/Neo4j 的 pipeline。實戰選擇是多個專用 worker 競爭認領任務

PG ExhibitDataFile 表的 status 欄位(每個 worker 認領一類): xmlStatus / elkAccountsStatus / elkCallsStatus / elkConversationsStatus / elkMessagesStatus / elkLocationsStatus / neo4jStatus / embeddingStatus

graph TB
    PARSE["parse-worker
讀 XML → 寫 ParsedXxx
設 xmlStatus = COMPLETED"] ELKP["elk-priority-worker (1-3 副本)
小資料量 status
(accounts / calls / conversations)"] NEO["neo4j-worker (1-2 副本)
處理 neo4jStatus"] ELKD["elk-data-worker (1-3 副本)
大資料量 status
(messages / locations)"] EMB["embedding-worker (1-5 副本)
讀空向量 → 呼叫 API
→ 寫回 ES embedding"] PARSE --> ELKP PARSE --> NEO PARSE --> ELKD ELKD --> EMB style PARSE fill:#fef3c7,stroke:#92400e,stroke-width:2px style ELKP fill:#dbeafe,stroke:#1e40af style ELKD fill:#dbeafe,stroke:#1e40af style EMB fill:#fce7f3,stroke:#9f1239 style NEO fill:#dcfce7,stroke:#166534

3.2 為什麼分這麼多 worker

優點 說明
平行度可調 embedding-worker 是 GPU bound,拉 5 副本。neo4j-worker 是 write bound,1 個就夠。
部分失敗不 cascade embedding API 掛了只影響向量,ES 全文搜尋 / Neo4j 關係都還能查。
優先級分化 priority worker 先搞定「基本事實」(帳號 / 通話),大資料量的 messages 同步慢沒關係。
重試獨立 某個 entity 的 embeddingStatus=FAILED 不會卡住其他 status。

3.3 Worker 競爭認領 pattern

多副本 worker 會搶同一個 pending task。必須用 DB-level atomic claim 避免衝突:

-- worker 每個 poll cycle 跑一次
SELECT id FROM ExhibitDataFile
WHERE elkMessagesStatus = 'PENDING'
  AND xmlStatus = 'COMPLETED'                   -- 依賴前面 stage 完成
ORDER BY createdAt ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;                         -- ⭐ 關鍵

FOR UPDATE SKIP LOCKED 的語意:

多個 worker 同時跑這條 query,PG 保證每個 worker 拿到不同的 row。搶到後:

UPDATE ExhibitDataFile
SET elkMessagesStatus = 'PROCESSING',
    elkMessagesClaimedBy = 'worker-abc-123',
    elkMessagesStartedAt = now()
WHERE id = $claimedId;
COMMIT;

Commit 後 lock 釋放,但 status 已經從 PENDING 變 PROCESSING — 其他 worker 不會再看到這筆。

這 pattern 比 Redis-based queue(如 BullMQ)簡單:不需要多一個元件、不需要處理 worker crash 後的 task reclaim — PG 的 status 欄位天然是單一事實。

3.4 Stuck task 回收

Worker 掛了 / OOM 被 K8s 殺掉,status 會卡在 PROCESSING。對策:

-- 超過 30 分鐘還在 PROCESSING 的 → 視為 stuck,重設回 PENDING
UPDATE ExhibitDataFile
SET elkMessagesStatus = 'PENDING',
    elkMessagesClaimedBy = null
WHERE elkMessagesStatus = 'PROCESSING'
  AND elkMessagesStartedAt < now() - interval '30 minutes';

這個 query 由一個獨立的 cleanup CronJob 每 10 分鐘跑一次。實戰上 99% 的 stuck 都是 worker 被 K8s evict,重設後下個 poll cycle 會重跑。


四、衍生資料的一致性問題

跟 RDBMS 不同,ES 和 Neo4j 的資料跟 PG 之間有 eventual consistency 差距

4.1 常見陷阱:寫 PG → 立即查 ES

// ❌ 錯誤 pattern
await db.parsedMessage.create({ data: newMsg });
const result = await elkClient.search({                // 可能 miss
  index: "messages",
  query: { match: { body: newMsg.body } },
});
// result 是空的!因為 worker 還沒跑到

解法:

實戰選第 2 個:業務可接受延遲,且 worker 吞吐夠(10s refresh interval)。

4.2 同步完整性驗證

Worker 跑完後,自動檢查「PG 應有的筆數」vs「ES 實際有的筆數」:

async function verifyElkSync(exhibitId: string) {
  const pgCount = await db.parsedMessage.count({ where: { exhibitId } });
  const esResponse = await elkClient.count({
    index: "messages",
    query: { term: { pg_exhibit_id: exhibitId } },
  });
  const esCount = esResponse.count;

  if (pgCount !== esCount) {
    console.warn(`差距: PG=${pgCount}, ES=${esCount}`);
    await syncMissing(exhibitId);                     // 自動補
  }
}

這類 reconciliation job 是衍生資料 pipeline 的必需品。實戰上會抓到 1-2% 的漏同步(通常是 worker 中途 crash)。


五、聯合查詢 pattern

典型 UI 操作:「找出這個證物裡,提到『匯款』且跟甲乙丙互動過的訊息」 — 需要 ES + Neo4j + PG 三方出力。

5.1 Pattern A:ES 先,Neo4j 後

// Step 1: ES 查包含「匯款」的訊息 ID
const esHits = await elkClient.search({
  index: "messages",
  query: {
    bool: {
      must: [{ match: { body: "匯款" } }],
      filter: [{ term: { pg_exhibit_id: exhibitId } }],
    },
  },
  size: 1000,
});
const messageIds = esHits.hits.hits.map(h => h._source.pg_id);
const senderPartyIds = esHits.hits.hits
  .map(h => h._source.sender.pg_party_id)
  .filter(Boolean);

// Step 2: Neo4j 查這些 sender 跟甲乙丙有沒有互動
const neo4jResult = await session.run(`
  MATCH (s:Party)-[:PARTICIPATES_IN|PARTICIPATES_IN_CALL]->(i)
        <-[:PARTICIPATES_IN|PARTICIPATES_IN_CALL]-(target:Party)
  WHERE s.pg_id IN $senderIds
    AND target.pg_id IN $targetIds
  RETURN DISTINCT s.pg_id AS sender_id
`, { senderIds: senderPartyIds, targetIds: ["", "", ""] });

const validSenderIds = new Set(neo4jResult.records.map(r => r.get("sender_id")));

// Step 3: 過濾 ES 結果
const finalMessages = esHits.hits.hits.filter(h =>
  validSenderIds.has(h._source.sender.pg_party_id)
);

選 ES 先的理由:全文搜尋容易先把候選集縮小到幾百筆,再丟 Neo4j 查就快。

5.2 Pattern B:Neo4j 先,ES 後

// Step 1: Neo4j 找所有跟甲乙丙互動過的 party
const partiesResult = await session.run(`
  MATCH (p:Party)-[:PARTICIPATES_IN|PARTICIPATES_IN_CALL]->(i)
        <-[:PARTICIPATES_IN|PARTICIPATES_IN_CALL]-(target:Party)
  WHERE target.pg_id IN $targetIds
  RETURN DISTINCT p.pg_id AS party_id
`, { targetIds });

// Step 2: ES 用 term filter 縮到這些 party 的訊息
const esHits = await elkClient.search({
  index: "messages",
  query: {
    bool: {
      must: [{ match: { body: "匯款" } }],
      filter: [
        { term: { pg_exhibit_id: exhibitId } },
        { terms: { "sender.pg_party_id": partyIds } },
      ],
    },
  },
});

選 Neo4j 先的理由:關係條件很嚴格時(e.g. 3-hop),先用 graph 把候選集縮小。

5.3 選擇依據

先走 selectivity 高的那邊。如果「匯款」只命中 1000 筆訊息,ES 先。如果「跟甲乙丙互動過的 party」只有 50 個,Neo4j 先。實戰寫查詢時要 EXPLAIN / profile 確認走對順序。


六、學到的事


七、什麼時候 不該 上 polyglot persistence

這個 pattern 的 overhead 很高:

如果業務查詢 pattern 簡單(CRUD + 少量 reporting)、資料量 < 1000 萬筆、沒有多跳關係 / 全文檢索 / 向量搜尋其中一項,PG + 適當 index 就好。不要為了炫技上三套。

實戰判斷標準:在 PG 上跑 EXPLAIN ANALYZE 看你的核心查詢,超過 1 秒且沒辦法優化、資料量還會增長 10×以上,才開始考慮加第二個 store。


參考資料