fix(wcdb): 传给 native 的 wxid 与 db_storage 解析对齐 WeFlow

- 调用 wcdb_set_my_wxid 前去掉导出目录名的 4 位随机后缀(如 Murderers_0e5d -> Murderers),否则 native 会话/消息查询可能只返回极少结果。

- media_helpers 的 db_storage 路径解析改用 WeFlow 的 resolveDbStoragePath 规则:直接子目录、wxid 子目录、带后缀的 wxid 目录、向上回查、有限深度递归兜底,保证用户选到任意层级都能正确定位。

- 连接日志额外打印 native_wxid,便于排查匹配错位问题。
This commit is contained in:
2977094657
2026-04-29 17:25:46 +08:00
Unverified
parent a83f9b774c
commit 26c8f78d5b
2 changed files with 201 additions and 23 deletions
+144 -20
View File
@@ -1184,6 +1184,131 @@ def _load_account_source_info(account_dir: Path) -> dict[str, Any]:
return {}
def _clean_weflow_account_dir_name(dir_name: str) -> str:
"""按 WeFlow 的账号目录规则清理 wxid。
WeFlow 在连接 WCDB 前会把形如 `xxx_abcd` 的账号目录清理为 `xxx`
再传给 native `wcdb_set_my_wxid`。这里保持同样规则,避免 suffix 目录名
影响实时读取。
"""
trimmed = str(dir_name or "").strip()
if not trimmed:
return trimmed
if trimmed.lower().startswith("wxid_"):
match = re.match(r"^(wxid_[^_]+)", trimmed, flags=re.IGNORECASE)
if match:
return match.group(1)
return trimmed
suffix_match = re.match(r"^(.+)_([a-zA-Z0-9]{4})$", trimmed)
return suffix_match.group(1) if suffix_match else trimmed
def _find_db_storage_recursive(dir_path: Path, max_depth: int) -> Optional[Path]:
"""有限深度递归查找 db_storage,逻辑对齐 WeFlow。"""
if max_depth <= 0:
return None
try:
entries = list(dir_path.iterdir())
except Exception:
return None
for entry in entries:
try:
if entry.is_dir() and entry.name.lower() == "db_storage":
return entry
except Exception:
continue
for entry in entries:
try:
if not entry.is_dir():
continue
except Exception:
continue
found = _find_db_storage_recursive(entry, max_depth - 1)
if found is not None:
return found
return None
def _resolve_db_storage_path_like_weflow(base_path: str | Path, account_name: str) -> Optional[Path]:
"""按 WeFlow 的 resolveDbStoragePath 规则解析 db_storage。"""
raw = str(base_path or "").strip()
if not raw:
return None
try:
normalized = Path(raw).expanduser()
except Exception:
normalized = Path(raw)
def existing_dir(candidate: Path) -> Optional[Path]:
try:
return candidate if candidate.exists() and candidate.is_dir() else None
except Exception:
return None
direct_self = existing_dir(normalized)
if direct_self is not None and direct_self.name.lower() == "db_storage":
return direct_self
direct_child = existing_dir(normalized / "db_storage")
if direct_child is not None:
return direct_child
wxid_candidates: list[str] = []
for item in (account_name, _clean_weflow_account_dir_name(account_name)):
item = str(item or "").strip()
if item and item not in wxid_candidates:
wxid_candidates.append(item)
for wxid in wxid_candidates:
via_wxid = existing_dir(normalized / wxid / "db_storage")
if via_wxid is not None:
return via_wxid
# 兼容目录名包含额外后缀(如 wxid_xxx_1234)。
try:
entries = list(normalized.iterdir())
except Exception:
entries = []
lower_wxid = wxid.lower()
for entry in entries:
try:
if not entry.is_dir():
continue
except Exception:
continue
lower_entry = entry.name.lower()
if lower_entry == lower_wxid or lower_entry.startswith(f"{lower_wxid}_"):
candidate = existing_dir(entry / "db_storage")
if candidate is not None:
return candidate
# 兜底:向上查找 db_storage(最多 2 级),处理用户选择了子目录的情况。
try:
parent = normalized
for _ in range(2):
up = parent.parent
if up == parent:
break
parent = up
candidate_up = existing_dir(parent / "db_storage")
if candidate_up is not None:
return candidate_up
for wxid in wxid_candidates:
via_wxid_up = existing_dir(parent / wxid / "db_storage")
if via_wxid_up is not None:
return via_wxid_up
except Exception:
pass
return _find_db_storage_recursive(normalized, 3)
def _guess_wxid_dir_from_common_paths(account_name: str) -> Optional[Path]:
try:
home = Path.home()
@@ -1195,14 +1320,18 @@ def _guess_wxid_dir_from_common_paths(account_name: str) -> Optional[Path]:
home / "Documents" / "WeChat Files",
]
candidates = [account_name, _clean_weflow_account_dir_name(account_name)]
candidates = [x for i, x in enumerate(candidates) if x and x not in candidates[:i]]
# Exact match first
for root in roots:
c = root / account_name
try:
if c.exists() and c.is_dir():
return c
except Exception:
continue
for name in candidates:
c = root / name
try:
if c.exists() and c.is_dir():
return c
except Exception:
continue
# Then try prefix match: wxid_xxx_yyyy
for root in roots:
@@ -1212,8 +1341,9 @@ def _guess_wxid_dir_from_common_paths(account_name: str) -> Optional[Path]:
for p in root.iterdir():
if not p.is_dir():
continue
if p.name.startswith(account_name + "_"):
return p
for name in candidates:
if p.name.startswith(name + "_"):
return p
except Exception:
continue
return None
@@ -1236,21 +1366,15 @@ def _resolve_account_db_storage_dir(account_dir: Path) -> Optional[Path]:
info = _load_account_source_info(account_dir)
db_storage_path = str(info.get("db_storage_path") or "").strip()
if db_storage_path:
try:
p = Path(db_storage_path)
if p.exists() and p.is_dir():
return p
except Exception:
pass
resolved = _resolve_db_storage_path_like_weflow(db_storage_path, account_dir.name)
if resolved is not None:
return resolved
wxid_dir = _resolve_account_wxid_dir(account_dir)
if wxid_dir:
c = wxid_dir / "db_storage"
try:
if c.exists() and c.is_dir():
return c
except Exception:
pass
resolved = _resolve_db_storage_path_like_weflow(wxid_dir, account_dir.name)
if resolved is not None:
return resolved
return None
+57 -3
View File
@@ -26,6 +26,46 @@ class WCDBRealtimeError(RuntimeError):
pass
def _clean_weflow_account_dir_name(dir_name: str) -> str:
"""调用 WCDB 前使用与 WeFlow 相同的账号/wxid 清理规则。"""
trimmed = str(dir_name or "").strip()
if not trimmed:
return trimmed
if trimmed.lower().startswith("wxid_"):
match = re.match(r"^(wxid_[^_]+)", trimmed, flags=re.IGNORECASE)
if match:
return match.group(1)
return trimmed
suffix_match = re.match(r"^(.+)_([a-zA-Z0-9]{4})$", trimmed)
return suffix_match.group(1) if suffix_match else trimmed
def _derive_weflow_wcdb_wxid(account: str, db_storage_dir: Optional[Path] = None) -> str:
"""推导传给 native WCDB 的 wxid,语义对齐 WeFlow。
output 账号目录可能带随机后缀,例如 `Murderers_0e5d`。
WeFlow 在调用 `wcdb_set_my_wxid` 前会去掉这个后缀;如果传带后缀的名字,
native 会话/消息查询可能只返回很少结果。
"""
candidates: list[str] = []
if db_storage_dir is not None:
try:
parent_name = Path(db_storage_dir).parent.name
if parent_name:
candidates.append(parent_name)
except Exception:
pass
candidates.append(str(account or ""))
for item in candidates:
cleaned = _clean_weflow_account_dir_name(item)
if cleaned:
return cleaned
return str(account or "").strip()
_NATIVE_DIR = Path(__file__).resolve().parent / "native"
_DEFAULT_WCDB_API_DLL = _NATIVE_DIR / "wcdb_api.dll"
_WCDB_API_DLL_SELECTED: Optional[Path] = None
@@ -1459,6 +1499,7 @@ def _resolve_session_db_path(db_storage_dir: Path) -> Path:
@dataclass(frozen=True)
class WCDBRealtimeConnection:
account: str
native_wxid: str
handle: int
db_storage_dir: Path
session_db_path: Path
@@ -1484,13 +1525,16 @@ class WCDBRealtimeManager:
db_storage_dir = None
session_db_path = None
native_wxid = ""
err = ""
try:
db_storage_dir = _resolve_account_db_storage_dir(account_dir)
if db_storage_dir is not None:
native_wxid = _derive_weflow_wcdb_wxid(account, db_storage_dir)
session_db_path = _resolve_session_db_path(db_storage_dir)
except Exception as e:
err = str(e)
native_wxid = _derive_weflow_wcdb_wxid(account, db_storage_dir)
dll_path = _resolve_wcdb_api_dll_path()
try:
@@ -1503,6 +1547,7 @@ class WCDBRealtimeManager:
"dll_present": bool(dll_ok),
"wcdb_api_dll": str(dll_path),
"key_present": bool(key_ok),
"native_wxid": native_wxid,
"db_storage_dir": str(db_storage_dir) if db_storage_dir else "",
"session_db_path": str(session_db_path) if session_db_path else "",
"connected": bool(connected),
@@ -1565,6 +1610,7 @@ class WCDBRealtimeManager:
raise WCDBRealtimeError("Cannot resolve db_storage directory for this account.")
session_db_path = _resolve_session_db_path(db_storage_dir)
native_wxid = _derive_weflow_wcdb_wxid(account, db_storage_dir)
# Run open_account in a daemon thread with a timeout to avoid
# blocking indefinitely when the native library hangs (locked DB).
@@ -1609,14 +1655,16 @@ class WCDBRealtimeManager:
raise WCDBRealtimeError("open_account returned no handle.")
handle = _handle_box[0]
# Some WCDB APIs (e.g. exec_query on non-session DBs) may require this context.
# 对齐 WeFlow:传清理后的 wxid/account 名称给 native WCDB
# 不传带 4 位随机后缀的导出目录名。
try:
set_my_wxid(handle, account)
set_my_wxid(handle, native_wxid)
except Exception:
pass
conn = WCDBRealtimeConnection(
account=account,
native_wxid=native_wxid,
handle=handle,
db_storage_dir=db_storage_dir,
session_db_path=session_db_path,
@@ -1627,7 +1675,13 @@ class WCDBRealtimeManager:
with self._mu:
self._conns[account] = conn
self._failed.pop(account, None)
logger.info("[wcdb] connected account=%s handle=%s session_db=%s", account, int(handle), session_db_path)
logger.info(
"[wcdb] connected account=%s native_wxid=%s handle=%s session_db=%s",
account,
native_wxid,
int(handle),
session_db_path,
)
return conn
finally:
with self._mu: