feat(chat): 支持聊天记录实时更新(WCDB + SSE)

- 新增 WCDB realtime 连接管理与 DLL 依赖,并在 shutdown 释放资源

- 新增实时状态/变更 SSE 与增量同步接口(单会话/全会话)

- sessions/messages 支持 source=realtime,增强内容解码与 packed_info md5 提取

- wheel 打包包含 WCDB.dll / wcdb_api.dll
This commit is contained in:
2977094657
2026-01-01 23:23:01 +08:00
parent 219143f663
commit 57ffcd3aa0
7 changed files with 1966 additions and 128 deletions

View File

@@ -30,4 +30,8 @@ build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel] [tool.hatch.build.targets.wheel]
packages = ["src/wechat_decrypt_tool"] packages = ["src/wechat_decrypt_tool"]
include = ["src/wechat_decrypt_tool/native/VoipEngine.dll"] include = [
"src/wechat_decrypt_tool/native/VoipEngine.dll",
"src/wechat_decrypt_tool/native/wcdb_api.dll",
"src/wechat_decrypt_tool/native/WCDB.dll",
]

View File

@@ -13,6 +13,7 @@ from .routers.health import router as _health_router
from .routers.keys import router as _keys_router from .routers.keys import router as _keys_router
from .routers.media import router as _media_router from .routers.media import router as _media_router
from .routers.wechat_detection import router as _wechat_detection_router from .routers.wechat_detection import router as _wechat_detection_router
from .wcdb_realtime import WCDB_REALTIME, shutdown as _wcdb_shutdown
# 初始化日志系统 # 初始化日志系统
setup_logging() setup_logging()
@@ -46,6 +47,18 @@ app.include_router(_chat_export_router)
app.include_router(_chat_media_router) app.include_router(_chat_media_router)
@app.on_event("shutdown")
async def _shutdown_wcdb_realtime() -> None:
try:
WCDB_REALTIME.close_all()
except Exception:
pass
try:
_wcdb_shutdown()
except Exception:
pass
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn

View File

@@ -321,8 +321,77 @@ def _looks_like_xml(s: str) -> bool:
def _decode_message_content(compress_value: Any, message_value: Any) -> str: def _decode_message_content(compress_value: Any, message_value: Any) -> str:
def try_decode_text_blob(text: str) -> Optional[str]:
t = (text or "").strip()
if not t:
return None
# zstd frame magic: 28 b5 2f fd
zstd_magic = b"\x28\xb5\x2f\xfd"
if len(t) >= 16 and len(t) % 2 == 0 and re.fullmatch(r"[0-9a-fA-F]+", t):
try:
raw = bytes.fromhex(t)
if zstd is not None and raw.startswith(zstd_magic):
try:
out = zstd.decompress(raw)
s2 = out.decode("utf-8", errors="ignore")
s2 = html.unescape(s2.strip())
if _looks_like_xml(s2) or _is_mostly_printable_text(s2):
return s2
except Exception:
pass
s2 = raw.decode("utf-8", errors="ignore")
s2 = html.unescape(s2.strip())
# Avoid decoding user-sent pure-hex text (e.g. "68656c6c6f") into arbitrary strings;
# only accept non-zstd hex if it still looks like a message XML payload.
s2_lower = s2.lower()
if (
_looks_like_xml(s2)
or ("<msg" in s2_lower and "</msg>" in s2_lower)
or "<appmsg" in s2_lower
):
return s2
except Exception:
return None
if len(t) >= 24 and len(t) % 4 == 0 and re.fullmatch(r"[A-Za-z0-9+/=]+", t):
try:
raw = base64.b64decode(t)
if zstd is not None and raw.startswith(zstd_magic):
try:
out = zstd.decompress(raw)
s2 = out.decode("utf-8", errors="ignore")
s2 = html.unescape(s2.strip())
if _looks_like_xml(s2) or _is_mostly_printable_text(s2):
return s2
except Exception:
pass
s2 = raw.decode("utf-8", errors="ignore")
s2 = html.unescape(s2.strip())
s2_lower = s2.lower()
if (
_looks_like_xml(s2)
or ("<msg" in s2_lower and "</msg>" in s2_lower)
or "<appmsg" in s2_lower
):
return s2
except Exception:
return None
return None
msg_text = _decode_sqlite_text(message_value) msg_text = _decode_sqlite_text(message_value)
# Realtime WCDB mode can return message_content as a hex/base64 encoded blob string
# (often a zstd frame starting with 28b52ffd...), while compress_content is null.
# NOTE: some callers set sqlite3.text_factory=bytes, so TEXT may arrive as bytes even when it
# is actually hex/base64 text; decode from msg_text, not from the raw python type.
s = html.unescape(msg_text.strip())
s2 = try_decode_text_blob(s)
if s2:
msg_text = s2
if isinstance(message_value, (bytes, bytearray, memoryview)): if isinstance(message_value, (bytes, bytearray, memoryview)):
raw = bytes(message_value) if isinstance(message_value, memoryview) else message_value raw = bytes(message_value) if isinstance(message_value, memoryview) else message_value
if raw.startswith(b"\x28\xb5\x2f\xfd") and zstd is not None: if raw.startswith(b"\x28\xb5\x2f\xfd") and zstd is not None:
@@ -338,51 +407,6 @@ def _decode_message_content(compress_value: Any, message_value: Any) -> str:
if compress_value is None: if compress_value is None:
return msg_text return msg_text
def try_decode_text_blob(text: str) -> Optional[str]:
t = (text or "").strip()
if not t:
return None
if len(t) >= 16 and len(t) % 2 == 0 and re.fullmatch(r"[0-9a-fA-F]+", t):
try:
raw = bytes.fromhex(t)
if zstd is not None:
try:
out = zstd.decompress(raw)
s2 = out.decode("utf-8", errors="ignore")
s2 = html.unescape(s2.strip())
if _looks_like_xml(s2) or _is_mostly_printable_text(s2):
return s2
except Exception:
pass
s2 = raw.decode("utf-8", errors="ignore")
s2 = html.unescape(s2.strip())
if _looks_like_xml(s2) or _is_mostly_printable_text(s2):
return s2
except Exception:
return None
if len(t) >= 24 and len(t) % 4 == 0 and re.fullmatch(r"[A-Za-z0-9+/=]+", t):
try:
raw = base64.b64decode(t)
if zstd is not None:
try:
out = zstd.decompress(raw)
s2 = out.decode("utf-8", errors="ignore")
s2 = html.unescape(s2.strip())
if _looks_like_xml(s2) or _is_mostly_printable_text(s2):
return s2
except Exception:
pass
s2 = raw.decode("utf-8", errors="ignore")
s2 = html.unescape(s2.strip())
if _looks_like_xml(s2) or _is_mostly_printable_text(s2):
return s2
except Exception:
return None
return None
if isinstance(compress_value, str): if isinstance(compress_value, str):
s = html.unescape(compress_value.strip()) s = html.unescape(compress_value.strip())
s2 = try_decode_text_blob(s) s2 = try_decode_text_blob(s)
@@ -427,6 +451,7 @@ def _decode_message_content(compress_value: Any, message_value: Any) -> str:
_MD5_HEX_RE = re.compile(rb"(?i)[0-9a-f]{32}") _MD5_HEX_RE = re.compile(rb"(?i)[0-9a-f]{32}")
_DAT_MD5_RE = re.compile(rb"(?i)([0-9a-f]{32})(?:[._][thbc])?\.dat") _DAT_MD5_RE = re.compile(rb"(?i)([0-9a-f]{32})(?:[._][thbc])?\.dat")
_PACKED_INFO_HEX_RE = re.compile(r"(?i)^[0-9a-f]+$")
def _extract_md5_from_blob(blob: Any) -> str: def _extract_md5_from_blob(blob: Any) -> str:
@@ -469,6 +494,42 @@ def _extract_md5_from_blob(blob: Any) -> str:
return "" return ""
def _extract_md5_from_packed_info(packed_info: Any) -> str:
if packed_info is None:
return ""
data: bytes = b""
if isinstance(packed_info, memoryview):
data = packed_info.tobytes()
elif isinstance(packed_info, (bytes, bytearray)):
data = bytes(packed_info)
elif isinstance(packed_info, str):
s = packed_info.strip()
if s.lower().startswith("0x"):
s = s[2:]
if s and _PACKED_INFO_HEX_RE.fullmatch(s) and (len(s) % 2 == 0):
try:
data = bytes.fromhex(s)
except Exception:
data = b""
else:
data = s.encode("utf-8", errors="ignore")
else:
if isinstance(packed_info, (int, float, bool)):
data = b""
else:
try:
data = bytes(packed_info)
except Exception:
data = b""
md5 = _extract_md5_from_blob(data)
md5 = str(md5 or "").strip().lower()
if len(md5) == 32 and all(c in "0123456789abcdef" for c in md5):
return md5
return ""
def _resource_lookup_chat_id(resource_conn: sqlite3.Connection, username: str) -> Optional[int]: def _resource_lookup_chat_id(resource_conn: sqlite3.Connection, username: str) -> Optional[int]:
if not username: if not username:
return None return None

Binary file not shown.

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,476 @@
import ctypes
import json
import os
import sys
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
from .key_store import get_account_keys_from_store
from .logging_config import get_logger
from .media_helpers import _resolve_account_db_storage_dir
logger = get_logger(__name__)
class WCDBRealtimeError(RuntimeError):
pass
_NATIVE_DIR = Path(__file__).resolve().parent / "native"
_WCDB_API_DLL = _NATIVE_DIR / "wcdb_api.dll"
_lib_lock = threading.Lock()
_lib: Optional[ctypes.CDLL] = None
_initialized = False
def _is_windows() -> bool:
return sys.platform.startswith("win")
def _load_wcdb_lib() -> ctypes.CDLL:
global _lib
with _lib_lock:
if _lib is not None:
return _lib
if not _is_windows():
raise WCDBRealtimeError("WCDB realtime mode is only supported on Windows.")
if not _WCDB_API_DLL.exists():
raise WCDBRealtimeError(f"Missing wcdb_api.dll at: {_WCDB_API_DLL}")
# Ensure dependent DLLs (e.g. WCDB.dll) can be found.
try:
os.add_dll_directory(str(_NATIVE_DIR))
except Exception:
pass
lib = ctypes.CDLL(str(_WCDB_API_DLL))
# Signatures
lib.wcdb_init.argtypes = []
lib.wcdb_init.restype = ctypes.c_int
lib.wcdb_shutdown.argtypes = []
lib.wcdb_shutdown.restype = ctypes.c_int
lib.wcdb_open_account.argtypes = [
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.POINTER(ctypes.c_int64),
]
lib.wcdb_open_account.restype = ctypes.c_int
lib.wcdb_close_account.argtypes = [ctypes.c_int64]
lib.wcdb_close_account.restype = ctypes.c_int
lib.wcdb_get_sessions.argtypes = [ctypes.c_int64, ctypes.POINTER(ctypes.c_char_p)]
lib.wcdb_get_sessions.restype = ctypes.c_int
lib.wcdb_get_messages.argtypes = [
ctypes.c_int64,
ctypes.c_char_p,
ctypes.c_int32,
ctypes.c_int32,
ctypes.POINTER(ctypes.c_char_p),
]
lib.wcdb_get_messages.restype = ctypes.c_int
lib.wcdb_get_message_count.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int32)]
lib.wcdb_get_message_count.restype = ctypes.c_int
lib.wcdb_get_display_names.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p)]
lib.wcdb_get_display_names.restype = ctypes.c_int
lib.wcdb_get_avatar_urls.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p)]
lib.wcdb_get_avatar_urls.restype = ctypes.c_int
lib.wcdb_get_group_member_count.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int32)]
lib.wcdb_get_group_member_count.restype = ctypes.c_int
lib.wcdb_get_group_members.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p)]
lib.wcdb_get_group_members.restype = ctypes.c_int
lib.wcdb_get_logs.argtypes = [ctypes.POINTER(ctypes.c_char_p)]
lib.wcdb_get_logs.restype = ctypes.c_int
lib.wcdb_free_string.argtypes = [ctypes.c_char_p]
lib.wcdb_free_string.restype = None
_lib = lib
return lib
def _ensure_initialized() -> None:
global _initialized
lib = _load_wcdb_lib()
with _lib_lock:
if _initialized:
return
rc = int(lib.wcdb_init())
if rc != 0:
raise WCDBRealtimeError(f"wcdb_init failed: {rc}")
_initialized = True
def _safe_load_json(payload: str) -> Any:
try:
return json.loads(payload)
except Exception:
return None
def _call_out_json(fn, *args) -> str:
lib = _load_wcdb_lib()
out = ctypes.c_char_p()
rc = int(fn(*args, ctypes.byref(out)))
try:
if rc != 0:
logs = get_native_logs()
hint = ""
if logs:
hint = f" logs={logs[:6]}"
raise WCDBRealtimeError(f"wcdb api call failed: {rc}.{hint}")
raw = out.value or b""
try:
return raw.decode("utf-8", errors="replace")
except Exception:
return ""
finally:
try:
if out.value:
lib.wcdb_free_string(out)
except Exception:
pass
def get_native_logs() -> list[str]:
try:
_ensure_initialized()
except Exception:
return []
lib = _load_wcdb_lib()
out = ctypes.c_char_p()
rc = int(lib.wcdb_get_logs(ctypes.byref(out)))
try:
if rc != 0 or not out.value:
return []
payload = out.value.decode("utf-8", errors="replace")
decoded = _safe_load_json(payload)
if isinstance(decoded, list):
return [str(x) for x in decoded]
return []
except Exception:
return []
finally:
try:
if out.value:
lib.wcdb_free_string(out)
except Exception:
pass
def open_account(session_db_path: Path, key_hex: str) -> int:
_ensure_initialized()
p = Path(session_db_path)
if not p.exists():
raise WCDBRealtimeError(f"session db not found: {p}")
key = str(key_hex or "").strip()
if len(key) != 64:
raise WCDBRealtimeError("Invalid db key (must be 64 hex chars).")
lib = _load_wcdb_lib()
out_handle = ctypes.c_int64(0)
rc = int(lib.wcdb_open_account(str(p).encode("utf-8"), key.encode("utf-8"), ctypes.byref(out_handle)))
if rc != 0 or int(out_handle.value) <= 0:
logs = get_native_logs()
hint = f" logs={logs[:6]}" if logs else ""
raise WCDBRealtimeError(f"wcdb_open_account failed: {rc}.{hint}")
return int(out_handle.value)
def close_account(handle: int) -> None:
try:
h = int(handle)
except Exception:
return
if h <= 0:
return
try:
_ensure_initialized()
except Exception:
return
lib = _load_wcdb_lib()
try:
lib.wcdb_close_account(ctypes.c_int64(h))
except Exception:
return
def get_sessions(handle: int) -> list[dict[str, Any]]:
_ensure_initialized()
lib = _load_wcdb_lib()
payload = _call_out_json(lib.wcdb_get_sessions, ctypes.c_int64(int(handle)))
decoded = _safe_load_json(payload)
if isinstance(decoded, list):
out: list[dict[str, Any]] = []
for x in decoded:
if isinstance(x, dict):
out.append(x)
return out
return []
def get_messages(handle: int, username: str, *, limit: int = 50, offset: int = 0) -> list[dict[str, Any]]:
_ensure_initialized()
lib = _load_wcdb_lib()
u = str(username or "").strip()
if not u:
return []
payload = _call_out_json(
lib.wcdb_get_messages,
ctypes.c_int64(int(handle)),
u.encode("utf-8"),
ctypes.c_int32(int(limit)),
ctypes.c_int32(int(offset)),
)
decoded = _safe_load_json(payload)
if isinstance(decoded, list):
out: list[dict[str, Any]] = []
for x in decoded:
if isinstance(x, dict):
out.append(x)
return out
return []
def get_message_count(handle: int, username: str) -> int:
_ensure_initialized()
lib = _load_wcdb_lib()
u = str(username or "").strip()
if not u:
return 0
out_count = ctypes.c_int32(0)
rc = int(lib.wcdb_get_message_count(ctypes.c_int64(int(handle)), u.encode("utf-8"), ctypes.byref(out_count)))
if rc != 0:
return 0
return int(out_count.value)
def get_display_names(handle: int, usernames: list[str]) -> dict[str, str]:
_ensure_initialized()
lib = _load_wcdb_lib()
uniq = [str(u or "").strip() for u in usernames if str(u or "").strip()]
uniq = list(dict.fromkeys(uniq))
if not uniq:
return {}
payload = json.dumps(uniq, ensure_ascii=False).encode("utf-8")
out_json = _call_out_json(lib.wcdb_get_display_names, ctypes.c_int64(int(handle)), payload)
decoded = _safe_load_json(out_json)
if isinstance(decoded, dict):
return {str(k): str(v) for k, v in decoded.items()}
return {}
def get_avatar_urls(handle: int, usernames: list[str]) -> dict[str, str]:
_ensure_initialized()
lib = _load_wcdb_lib()
uniq = [str(u or "").strip() for u in usernames if str(u or "").strip()]
uniq = list(dict.fromkeys(uniq))
if not uniq:
return {}
payload = json.dumps(uniq, ensure_ascii=False).encode("utf-8")
out_json = _call_out_json(lib.wcdb_get_avatar_urls, ctypes.c_int64(int(handle)), payload)
decoded = _safe_load_json(out_json)
if isinstance(decoded, dict):
return {str(k): str(v) for k, v in decoded.items()}
return {}
def shutdown() -> None:
global _initialized
lib = _load_wcdb_lib()
with _lib_lock:
if not _initialized:
return
try:
lib.wcdb_shutdown()
finally:
_initialized = False
def _resolve_session_db_path(db_storage_dir: Path) -> Path:
# Prefer current WeChat 4.x naming/layout:
# - db_storage/session/session.db
# - (fallback) db_storage/session.db
candidates = [
db_storage_dir / "session" / "session.db",
db_storage_dir / "session.db",
db_storage_dir / "Session.db",
db_storage_dir / "MicroMsg.db",
]
for c in candidates:
try:
if c.exists() and c.is_file():
return c
except Exception:
continue
# Fallback: recursive search (some installs keep DBs in subfolders).
try:
for p in db_storage_dir.rglob("session.db"):
try:
if p.exists() and p.is_file():
return p
except Exception:
continue
except Exception:
pass
try:
for p in db_storage_dir.rglob("MicroMsg.db"):
try:
if p.exists() and p.is_file():
return p
except Exception:
continue
except Exception:
pass
raise WCDBRealtimeError(f"Cannot find session db in: {db_storage_dir}")
@dataclass(frozen=True)
class WCDBRealtimeConnection:
account: str
handle: int
db_storage_dir: Path
session_db_path: Path
connected_at: float
lock: threading.Lock
class WCDBRealtimeManager:
def __init__(self) -> None:
self._mu = threading.Lock()
self._conns: dict[str, WCDBRealtimeConnection] = {}
self._connecting: dict[str, threading.Event] = {}
def get_status(self, account_dir: Path) -> dict[str, Any]:
account = str(account_dir.name)
key_item = get_account_keys_from_store(account)
key_hex = str((key_item or {}).get("db_key") or "").strip()
key_ok = len(key_hex) == 64
db_storage_dir = None
session_db_path = None
err = ""
try:
db_storage_dir = _resolve_account_db_storage_dir(account_dir)
if db_storage_dir is not None:
session_db_path = _resolve_session_db_path(db_storage_dir)
except Exception as e:
err = str(e)
dll_ok = _WCDB_API_DLL.exists()
connected = self.is_connected(account)
return {
"account": account,
"dll_present": bool(dll_ok),
"key_present": bool(key_ok),
"db_storage_dir": str(db_storage_dir) if db_storage_dir else "",
"session_db_path": str(session_db_path) if session_db_path else "",
"connected": bool(connected),
"error": err,
}
def is_connected(self, account: str) -> bool:
with self._mu:
conn = self._conns.get(str(account))
return bool(conn and conn.handle > 0)
def ensure_connected(self, account_dir: Path, *, key_hex: Optional[str] = None) -> WCDBRealtimeConnection:
account = str(account_dir.name)
while True:
with self._mu:
existing = self._conns.get(account)
if existing is not None and existing.handle > 0:
return existing
waiter = self._connecting.get(account)
if waiter is None:
waiter = threading.Event()
self._connecting[account] = waiter
break
# Another thread is connecting; wait a bit and retry.
waiter.wait(timeout=10.0)
key = str(key_hex or "").strip()
if not key:
key_item = get_account_keys_from_store(account)
key = str((key_item or {}).get("db_key") or "").strip()
if len(key) != 64:
raise WCDBRealtimeError("Missing db key for this account (call /api/keys or decrypt first).")
try:
db_storage_dir = _resolve_account_db_storage_dir(account_dir)
if db_storage_dir is None:
raise WCDBRealtimeError("Cannot resolve db_storage directory for this account.")
session_db_path = _resolve_session_db_path(db_storage_dir)
handle = open_account(session_db_path, key)
conn = WCDBRealtimeConnection(
account=account,
handle=handle,
db_storage_dir=db_storage_dir,
session_db_path=session_db_path,
connected_at=time.time(),
lock=threading.Lock(),
)
with self._mu:
self._conns[account] = conn
logger.info(f"[wcdb] connected account={account} session_db={session_db_path}")
return conn
finally:
with self._mu:
ev = self._connecting.pop(account, None)
if ev is not None:
ev.set()
def disconnect(self, account: str) -> None:
a = str(account or "").strip()
if not a:
return
with self._mu:
conn = self._conns.pop(a, None)
if conn is None:
return
try:
with conn.lock:
close_account(conn.handle)
except Exception:
pass
def close_all(self) -> None:
with self._mu:
conns = list(self._conns.values())
self._conns.clear()
for conn in conns:
try:
with conn.lock:
close_account(conn.handle)
except Exception:
continue
WCDB_REALTIME = WCDBRealtimeManager()