mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
Compare commits
4 Commits
@@ -10,7 +10,7 @@
|
||||
<img src="https://img.shields.io/github/stars/LifeArchiveProject/WeChatDataAnalysis" alt="Stars" />
|
||||
<img src="https://gh-down-badges.linkof.link/LifeArchiveProject/WeChatDataAnalysis" alt="Downloads" />
|
||||
<img src="https://img.shields.io/github/forks/LifeArchiveProject/WeChatDataAnalysis" alt="Forks" />
|
||||
<a href="https://qm.qq.com/q/VQEQ7PcGkk"><img src="https://img.shields.io/badge/QQ%20Group-WeChatDataAnalysis-12B7F5?logo=tencentqq&logoColor=white" alt="QQ Group" /></a>
|
||||
<a href="https://qm.qq.com/q/VQEQ7PcGkk"><img src="https://img.shields.io/badge/QQ Group-WeChatDataAnalysis-12B7F5?logo=tencentqq&logoColor=white" alt="QQ Group" /></a>
|
||||
<img src="https://img.shields.io/badge/Python-3776AB?logo=Python&logoColor=white" alt="Python" />
|
||||
<img src="https://img.shields.io/badge/Vue.js-4FC08D?logo=Vue.js&logoColor=white" alt="Vue.js" />
|
||||
<img src="https://img.shields.io/badge/SQLite-003B57?logo=SQLite&logoColor=white" alt="SQLite" />
|
||||
|
||||
@@ -24,6 +24,17 @@ logger = get_logger(__name__)
|
||||
|
||||
_OUTPUT_DATABASES_DIR = get_output_databases_dir()
|
||||
_DEBUG_SESSIONS = os.environ.get("WECHAT_TOOL_DEBUG_SESSIONS", "0") == "1"
|
||||
_SQLITE_HEADER = b"SQLite format 3\x00"
|
||||
|
||||
|
||||
def _is_valid_decrypted_sqlite(path: Path) -> bool:
|
||||
try:
|
||||
if not path.exists() or (not path.is_file()):
|
||||
return False
|
||||
with path.open("rb") as f:
|
||||
return f.read(len(_SQLITE_HEADER)) == _SQLITE_HEADER
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _list_decrypted_accounts() -> list[str]:
|
||||
@@ -34,7 +45,7 @@ def _list_decrypted_accounts() -> list[str]:
|
||||
for p in _OUTPUT_DATABASES_DIR.iterdir():
|
||||
if not p.is_dir():
|
||||
continue
|
||||
if (p / "session.db").exists() and (p / "contact.db").exists():
|
||||
if _is_valid_decrypted_sqlite(p / "session.db") and _is_valid_decrypted_sqlite(p / "contact.db"):
|
||||
accounts.append(p.name)
|
||||
|
||||
accounts.sort()
|
||||
@@ -49,7 +60,9 @@ def _resolve_account_dir(account: Optional[str]) -> Path:
|
||||
detail="No decrypted databases found. Please decrypt first.",
|
||||
)
|
||||
|
||||
selected = account or accounts[0]
|
||||
selected = str(account or "").strip() or accounts[0]
|
||||
if selected not in accounts:
|
||||
raise HTTPException(status_code=404, detail="Account not found.")
|
||||
base = _OUTPUT_DATABASES_DIR.resolve()
|
||||
candidate = (_OUTPUT_DATABASES_DIR / selected).resolve()
|
||||
|
||||
|
||||
@@ -24,6 +24,17 @@ logger = get_logger(__name__)
|
||||
|
||||
# 运行时输出目录(桌面端可通过 WECHAT_TOOL_DATA_DIR 指向可写目录)
|
||||
_PACKAGE_ROOT = Path(__file__).resolve().parent
|
||||
_SQLITE_HEADER = b"SQLite format 3\x00"
|
||||
|
||||
|
||||
def _is_valid_decrypted_sqlite(path: Path) -> bool:
|
||||
try:
|
||||
if not path.exists() or (not path.is_file()):
|
||||
return False
|
||||
with path.open("rb") as f:
|
||||
return f.read(len(_SQLITE_HEADER)) == _SQLITE_HEADER
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _list_decrypted_accounts() -> list[str]:
|
||||
@@ -36,7 +47,7 @@ def _list_decrypted_accounts() -> list[str]:
|
||||
for p in output_db_dir.iterdir():
|
||||
if not p.is_dir():
|
||||
continue
|
||||
if (p / "session.db").exists() and (p / "contact.db").exists():
|
||||
if _is_valid_decrypted_sqlite(p / "session.db") and _is_valid_decrypted_sqlite(p / "contact.db"):
|
||||
accounts.append(p.name)
|
||||
|
||||
accounts.sort()
|
||||
@@ -53,7 +64,9 @@ def _resolve_account_dir(account: Optional[str]) -> Path:
|
||||
detail="No decrypted databases found. Please decrypt first.",
|
||||
)
|
||||
|
||||
selected = account or accounts[0]
|
||||
selected = str(account or "").strip() or accounts[0]
|
||||
if selected not in accounts:
|
||||
raise HTTPException(status_code=404, detail="Account not found.")
|
||||
base = output_db_dir.resolve()
|
||||
candidate = (output_db_dir / selected).resolve()
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from ..app_paths import get_output_databases_dir
|
||||
from ..logging_config import get_logger
|
||||
from ..path_fix import PathFixRoute
|
||||
from ..key_store import upsert_account_keys_in_store
|
||||
from ..wechat_decrypt import WeChatDatabaseDecryptor, decrypt_wechat_databases
|
||||
from ..wechat_decrypt import WeChatDatabaseDecryptor, decrypt_wechat_databases, scan_account_databases_from_path
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -79,6 +79,8 @@ async def decrypt_databases(request: DecryptRequest):
|
||||
"account_results": results.get("account_results", {}),
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"解密API异常: {str(e)}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
@@ -126,44 +128,17 @@ async def decrypt_databases_stream(
|
||||
yield _sse({"type": "scanning", "message": "正在扫描数据库文件..."})
|
||||
await asyncio.sleep(0)
|
||||
|
||||
account_name = "unknown_account"
|
||||
path_parts = storage_path.parts
|
||||
account_patterns = ["wxid_"]
|
||||
for part in path_parts:
|
||||
for pattern in account_patterns:
|
||||
if part.startswith(pattern):
|
||||
parts = part.split("_")
|
||||
if len(parts) >= 3:
|
||||
account_name = "_".join(parts[:-1])
|
||||
else:
|
||||
account_name = part
|
||||
break
|
||||
if account_name != "unknown_account":
|
||||
break
|
||||
|
||||
if account_name == "unknown_account":
|
||||
for part in reversed(path_parts):
|
||||
if part != "db_storage" and len(part) > 3:
|
||||
account_name = part
|
||||
break
|
||||
|
||||
databases: list[dict] = []
|
||||
for root, _dirs, files in os.walk(storage_path):
|
||||
if "db_storage" not in str(root):
|
||||
continue
|
||||
for file_name in files:
|
||||
if not file_name.endswith(".db"):
|
||||
continue
|
||||
if file_name in ["key_info.db"]:
|
||||
continue
|
||||
db_path = os.path.join(root, file_name)
|
||||
databases.append({"path": db_path, "name": file_name, "account": account_name})
|
||||
|
||||
if not databases:
|
||||
yield _sse({"type": "error", "message": "未找到微信数据库文件!请检查 db_storage_path 是否正确"})
|
||||
scan_result = scan_account_databases_from_path(p)
|
||||
if scan_result["status"] == "error":
|
||||
payload = {"type": "error", "message": scan_result["message"]}
|
||||
detected_accounts = scan_result.get("detected_accounts") or []
|
||||
if detected_accounts:
|
||||
payload["detected_accounts"] = detected_accounts
|
||||
yield _sse(payload)
|
||||
return
|
||||
|
||||
account_databases = {account_name: databases}
|
||||
account_databases = scan_result.get("account_databases", {})
|
||||
account_sources = scan_result.get("account_sources", {})
|
||||
total_databases = sum(len(dbs) for dbs in account_databases.values())
|
||||
|
||||
yield _sse({"type": "start", "total": total_databases, "message": f"开始解密 {total_databases} 个数据库"})
|
||||
@@ -193,12 +168,9 @@ async def decrypt_databases_stream(
|
||||
|
||||
# Save a hint for later UI (same as non-stream endpoint).
|
||||
try:
|
||||
source_db_storage_path = p
|
||||
wxid_dir = ""
|
||||
if storage_path.name.lower() == "db_storage":
|
||||
wxid_dir = str(storage_path.parent)
|
||||
else:
|
||||
wxid_dir = str(storage_path)
|
||||
source_info = account_sources.get(account, {})
|
||||
source_db_storage_path = str(source_info.get("db_storage_path") or p)
|
||||
wxid_dir = str(source_info.get("wxid_dir") or "")
|
||||
(account_output_dir / "_source.json").write_text(
|
||||
json.dumps({"db_storage_path": source_db_storage_path, "wxid_dir": wxid_dir}, ensure_ascii=False, indent=2),
|
||||
encoding="utf-8",
|
||||
|
||||
@@ -26,25 +26,41 @@ _DEFAULT_WCDB_API_DLL = _NATIVE_DIR / "wcdb_api.dll"
|
||||
_WCDB_API_DLL_SELECTED: Optional[Path] = None
|
||||
|
||||
|
||||
def _is_project_wcdb_api_dll_path(path: Path) -> bool:
|
||||
try:
|
||||
resolved = path.resolve(strict=False)
|
||||
except Exception:
|
||||
resolved = path
|
||||
|
||||
try:
|
||||
default_resolved = _DEFAULT_WCDB_API_DLL.resolve(strict=False)
|
||||
except Exception:
|
||||
default_resolved = _DEFAULT_WCDB_API_DLL
|
||||
|
||||
if resolved == default_resolved:
|
||||
return True
|
||||
|
||||
parts = tuple(str(part).lower() for part in resolved.parts)
|
||||
allowed_suffixes = (
|
||||
("backend", "native", "wcdb_api.dll"),
|
||||
("wechat_decrypt_tool", "native", "wcdb_api.dll"),
|
||||
)
|
||||
return any(parts[-len(suffix) :] == suffix for suffix in allowed_suffixes)
|
||||
|
||||
|
||||
def _candidate_wcdb_api_dll_paths() -> list[Path]:
|
||||
"""Return possible locations for wcdb_api.dll (prefer WeFlow's newer build when present)."""
|
||||
"""Return allowed locations for wcdb_api.dll."""
|
||||
cands: list[Path] = []
|
||||
|
||||
env = str(os.environ.get("WECHAT_TOOL_WCDB_API_DLL_PATH", "") or "").strip()
|
||||
if env:
|
||||
cands.append(Path(env))
|
||||
env_path = Path(env)
|
||||
if _is_project_wcdb_api_dll_path(env_path):
|
||||
cands.append(env_path)
|
||||
else:
|
||||
logger.warning("[wcdb] ignore external wcdb_api.dll override: %s", env_path)
|
||||
|
||||
# Repo checkout convenience: reuse bundled WeFlow / echotrace DLLs when available.
|
||||
try:
|
||||
repo_root = Path(__file__).resolve().parents[2]
|
||||
except Exception:
|
||||
repo_root = Path.cwd()
|
||||
|
||||
for p in [
|
||||
repo_root / "WeFlow" / "resources" / "wcdb_api.dll",
|
||||
repo_root / "echotrace" / "assets" / "dll" / "wcdb_api.dll",
|
||||
_DEFAULT_WCDB_API_DLL,
|
||||
]:
|
||||
for p in (_DEFAULT_WCDB_API_DLL,):
|
||||
if p not in cands:
|
||||
cands.append(p)
|
||||
|
||||
|
||||
@@ -27,6 +27,169 @@ from .app_paths import get_output_databases_dir
|
||||
# SQLite文件头
|
||||
SQLITE_HEADER = b"SQLite format 3\x00"
|
||||
|
||||
|
||||
def _normalize_account_name(name: str) -> str:
|
||||
value = str(name or "").strip()
|
||||
if not value:
|
||||
return "unknown_account"
|
||||
|
||||
if value.startswith("wxid_"):
|
||||
parts = value.split("_")
|
||||
if len(parts) >= 3:
|
||||
trimmed = "_".join(parts[:-1]).strip()
|
||||
if trimmed:
|
||||
return trimmed
|
||||
|
||||
return value
|
||||
|
||||
|
||||
def _derive_account_name_from_path(path: Path) -> str:
|
||||
try:
|
||||
target = path.resolve()
|
||||
except Exception:
|
||||
target = path
|
||||
|
||||
for part in target.parts:
|
||||
part_str = str(part or "").strip()
|
||||
if part_str.startswith("wxid_"):
|
||||
return _normalize_account_name(part_str)
|
||||
|
||||
for part in reversed(target.parts):
|
||||
part_str = str(part or "").strip()
|
||||
if not part_str or part_str.lower() == "db_storage" or len(part_str) <= 3:
|
||||
continue
|
||||
return _normalize_account_name(part_str)
|
||||
|
||||
return "unknown_account"
|
||||
|
||||
|
||||
def _resolve_db_storage_roots(storage_path: Path) -> list[Path]:
|
||||
try:
|
||||
target = storage_path.resolve()
|
||||
except Exception:
|
||||
target = storage_path
|
||||
|
||||
if not target.exists():
|
||||
return []
|
||||
|
||||
current = target if target.is_dir() else target.parent
|
||||
probe = current
|
||||
while True:
|
||||
if probe.name.lower() == "db_storage":
|
||||
return [probe]
|
||||
parent = probe.parent
|
||||
if parent == probe:
|
||||
break
|
||||
probe = parent
|
||||
|
||||
roots: list[Path] = []
|
||||
try:
|
||||
for root, dirs, _files in os.walk(current):
|
||||
root_path = Path(root)
|
||||
if root_path.name.lower() != "db_storage":
|
||||
continue
|
||||
roots.append(root_path)
|
||||
dirs[:] = []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
uniq: list[Path] = []
|
||||
seen: set[str] = set()
|
||||
for root in roots:
|
||||
key = str(root)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
uniq.append(root)
|
||||
uniq.sort(key=lambda p: str(p).lower())
|
||||
return uniq
|
||||
|
||||
|
||||
def scan_account_databases_from_path(db_storage_path: str) -> dict:
|
||||
storage_path = Path(str(db_storage_path or "").strip())
|
||||
if not storage_path.exists():
|
||||
return {
|
||||
"status": "error",
|
||||
"message": f"指定的数据库路径不存在: {db_storage_path}",
|
||||
"account_databases": {},
|
||||
"account_sources": {},
|
||||
"detected_accounts": [],
|
||||
}
|
||||
|
||||
db_roots = _resolve_db_storage_roots(storage_path)
|
||||
if not db_roots:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "未找到微信数据库文件!请确保路径指向具体账号的 db_storage 目录。",
|
||||
"account_databases": {},
|
||||
"account_sources": {},
|
||||
"detected_accounts": [],
|
||||
}
|
||||
|
||||
detected_accounts = [
|
||||
{
|
||||
"account": _derive_account_name_from_path(root),
|
||||
"db_storage_path": str(root),
|
||||
"wxid_dir": str(root.parent),
|
||||
}
|
||||
for root in db_roots
|
||||
]
|
||||
|
||||
if len(db_roots) > 1:
|
||||
account_names = ", ".join(
|
||||
[str(item.get("account") or item.get("db_storage_path") or "").strip() for item in detected_accounts]
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": (
|
||||
"检测到多个账号目录,请选择具体账号的 db_storage 目录后再解密,"
|
||||
f"不要直接选择上级目录。当前检测到: {account_names}"
|
||||
),
|
||||
"account_databases": {},
|
||||
"account_sources": {},
|
||||
"detected_accounts": detected_accounts,
|
||||
}
|
||||
|
||||
db_root = db_roots[0]
|
||||
account_name = _derive_account_name_from_path(db_root)
|
||||
databases: list[dict] = []
|
||||
for root, _dirs, files in os.walk(db_root):
|
||||
for file_name in files:
|
||||
if not file_name.endswith(".db"):
|
||||
continue
|
||||
if file_name in ["key_info.db"]:
|
||||
continue
|
||||
db_path = os.path.join(root, file_name)
|
||||
databases.append(
|
||||
{
|
||||
"path": db_path,
|
||||
"name": file_name,
|
||||
"account": account_name,
|
||||
}
|
||||
)
|
||||
|
||||
if not databases:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "未找到微信数据库文件!请检查 db_storage_path 是否正确",
|
||||
"account_databases": {},
|
||||
"account_sources": {},
|
||||
"detected_accounts": detected_accounts,
|
||||
}
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "",
|
||||
"account_databases": {account_name: databases},
|
||||
"account_sources": {
|
||||
account_name: {
|
||||
"db_storage_path": str(db_root),
|
||||
"wxid_dir": str(db_root.parent),
|
||||
}
|
||||
},
|
||||
"detected_accounts": detected_accounts,
|
||||
}
|
||||
|
||||
def setup_logging():
|
||||
"""设置日志配置 - 已弃用,使用统一的日志配置"""
|
||||
from .logging_config import setup_logging as unified_setup_logging
|
||||
@@ -259,75 +422,28 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
|
||||
|
||||
# 查找数据库文件并按账号组织
|
||||
account_databases = {} # {account_name: [db_info, ...]}
|
||||
account_sources = {}
|
||||
detected_accounts = []
|
||||
|
||||
if db_storage_path:
|
||||
# 使用指定路径查找数据库
|
||||
storage_path = Path(db_storage_path)
|
||||
|
||||
if storage_path.exists():
|
||||
# 尝试从路径中提取账号名
|
||||
account_name = "unknown_account"
|
||||
path_parts = storage_path.parts
|
||||
|
||||
# 常见的微信账号格式模式
|
||||
account_patterns = ['wxid_']
|
||||
|
||||
for part in path_parts:
|
||||
# 检查是否匹配已知的账号格式
|
||||
for pattern in account_patterns:
|
||||
if part.startswith(pattern):
|
||||
# 提取主要部分,去掉后面的随机后缀
|
||||
# 例如:wxid_v4mbduwqtzpt22_1e7a -> wxid_v4mbduwqtzpt22
|
||||
parts = part.split('_')
|
||||
if len(parts) >= 3: # wxid_主要部分_随机后缀
|
||||
account_name = '_'.join(parts[:-1]) # 去掉最后一个随机部分
|
||||
else:
|
||||
account_name = part # 如果格式不符合预期,保留原名
|
||||
break
|
||||
if account_name != "unknown_account":
|
||||
break
|
||||
|
||||
# 如果没有匹配到已知格式,使用包含数据库的目录名
|
||||
if account_name == "unknown_account":
|
||||
# 查找包含db_storage的父目录作为账号名
|
||||
for part in reversed(path_parts):
|
||||
if part != "db_storage" and len(part) > 3:
|
||||
account_name = part
|
||||
break
|
||||
|
||||
databases = []
|
||||
# 使用递归查找,与自动检测逻辑一致
|
||||
for root, dirs, files in os.walk(storage_path):
|
||||
# 只处理db_storage目录下的数据库文件
|
||||
if "db_storage" not in str(root):
|
||||
continue
|
||||
for file_name in files:
|
||||
if not file_name.endswith(".db"):
|
||||
continue
|
||||
# 排除不需要解密的数据库
|
||||
if file_name in ["key_info.db"]:
|
||||
continue
|
||||
db_path = os.path.join(root, file_name)
|
||||
databases.append({
|
||||
'path': db_path,
|
||||
'name': file_name,
|
||||
'account': account_name
|
||||
})
|
||||
|
||||
if databases:
|
||||
account_databases[account_name] = databases
|
||||
logger.info(f"在指定路径找到账号 {account_name} 的 {len(databases)} 个数据库文件")
|
||||
else:
|
||||
scan_result = scan_account_databases_from_path(db_storage_path)
|
||||
detected_accounts = scan_result.get("detected_accounts", [])
|
||||
if scan_result["status"] == "error":
|
||||
return {
|
||||
"status": "error",
|
||||
"message": f"指定的数据库路径不存在: {db_storage_path}",
|
||||
"message": scan_result["message"],
|
||||
"total_databases": 0,
|
||||
"successful_count": 0,
|
||||
"failed_count": 0,
|
||||
"output_directory": str(base_output_dir.absolute()),
|
||||
"processed_files": [],
|
||||
"failed_files": []
|
||||
"failed_files": [],
|
||||
"detected_accounts": scan_result.get("detected_accounts", []),
|
||||
}
|
||||
account_databases = scan_result.get("account_databases", {})
|
||||
account_sources = scan_result.get("account_sources", {})
|
||||
for account_name, databases in account_databases.items():
|
||||
logger.info(f"在指定路径找到账号 {account_name} 的 {len(databases)} 个数据库文件")
|
||||
else:
|
||||
# 不再支持自动检测,要求用户提供具体的db_storage_path
|
||||
return {
|
||||
@@ -387,14 +503,9 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
|
||||
logger.info(f"账号 {account_name} 输出目录: {account_output_dir}")
|
||||
|
||||
try:
|
||||
source_db_storage_path = str(db_storage_path or "")
|
||||
wxid_dir = ""
|
||||
if db_storage_path:
|
||||
sp = Path(db_storage_path)
|
||||
if sp.name.lower() == "db_storage":
|
||||
wxid_dir = str(sp.parent)
|
||||
else:
|
||||
wxid_dir = str(sp)
|
||||
source_info = account_sources.get(account_name, {})
|
||||
source_db_storage_path = str(source_info.get("db_storage_path") or db_storage_path or "")
|
||||
wxid_dir = str(source_info.get("wxid_dir") or "")
|
||||
(account_output_dir / "_source.json").write_text(
|
||||
json.dumps(
|
||||
{
|
||||
@@ -473,7 +584,8 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
|
||||
"output_directory": str(base_output_dir.absolute()),
|
||||
"processed_files": processed_files,
|
||||
"failed_files": failed_files,
|
||||
"account_results": account_results # 新增:按账号的详细结果
|
||||
"account_results": account_results, # 新增:按账号的详细结果
|
||||
"detected_accounts": detected_accounts,
|
||||
}
|
||||
|
||||
logger.info("=" * 60)
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(ROOT / "src"))
|
||||
|
||||
from wechat_decrypt_tool import wcdb_realtime
|
||||
|
||||
|
||||
class TestWcdbRealtimeDllPathSelection(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
wcdb_realtime._WCDB_API_DLL_SELECTED = None
|
||||
|
||||
def tearDown(self) -> None:
|
||||
wcdb_realtime._WCDB_API_DLL_SELECTED = None
|
||||
|
||||
def test_resolve_prefers_project_dll_over_weflow(self) -> None:
|
||||
weflow_dll = ROOT / "WeFlow" / "resources" / "wcdb_api.dll"
|
||||
self.assertTrue(weflow_dll.exists())
|
||||
self.assertTrue(wcdb_realtime._DEFAULT_WCDB_API_DLL.exists())
|
||||
|
||||
with patch.dict(os.environ, {"WECHAT_TOOL_WCDB_API_DLL_PATH": str(weflow_dll)}, clear=False):
|
||||
resolved = wcdb_realtime._resolve_wcdb_api_dll_path()
|
||||
|
||||
self.assertEqual(
|
||||
resolved.resolve(),
|
||||
wcdb_realtime._DEFAULT_WCDB_API_DLL.resolve(),
|
||||
)
|
||||
|
||||
def test_resolve_accepts_project_packaged_override(self) -> None:
|
||||
packaged_dll = ROOT / "desktop" / "resources" / "backend" / "native" / "wcdb_api.dll"
|
||||
self.assertTrue(packaged_dll.exists())
|
||||
|
||||
with patch.dict(os.environ, {"WECHAT_TOOL_WCDB_API_DLL_PATH": str(packaged_dll)}, clear=False):
|
||||
resolved = wcdb_realtime._resolve_wcdb_api_dll_path()
|
||||
|
||||
self.assertEqual(resolved.resolve(), packaged_dll.resolve())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user