diff --git a/frontend/app.vue b/frontend/app.vue index cb55b98..f1b0426 100644 --- a/frontend/app.vue +++ b/frontend/app.vue @@ -30,7 +30,7 @@ onBeforeUnmount(() => { }) const route = useRoute() -const isChatRoute = computed(() => route.path?.startsWith('/chat') || route.path?.startsWith('/sns')) +const isChatRoute = computed(() => route.path?.startsWith('/chat') || route.path?.startsWith('/sns') || route.path?.startsWith('/contacts')) const rootClass = computed(() => { const base = 'bg-gradient-to-br from-green-50 via-emerald-50 to-green-100' diff --git a/frontend/composables/useApi.js b/frontend/composables/useApi.js index cbf1c5f..63d1209 100644 --- a/frontend/composables/useApi.js +++ b/frontend/composables/useApi.js @@ -292,6 +292,7 @@ export const useApi = () => { message_types: Array.isArray(data.message_types) ? data.message_types : [], include_media: data.include_media == null ? true : !!data.include_media, media_kinds: Array.isArray(data.media_kinds) ? data.media_kinds : ['image', 'emoji', 'video', 'video_thumb', 'voice', 'file'], + output_dir: data.output_dir == null ? null : String(data.output_dir || '').trim(), allow_process_key_extract: !!data.allow_process_key_extract, privacy_mode: !!data.privacy_mode, file_name: data.file_name || null @@ -313,6 +314,36 @@ export const useApi = () => { return await request(`/chat/exports/${encodeURIComponent(String(exportId))}`, { method: 'DELETE' }) } + // 联系人 + const listChatContacts = async (params = {}) => { + const query = new URLSearchParams() + if (params && params.account) query.set('account', params.account) + if (params && params.keyword) query.set('keyword', params.keyword) + if (params && params.include_friends != null) query.set('include_friends', String(!!params.include_friends)) + if (params && params.include_groups != null) query.set('include_groups', String(!!params.include_groups)) + if (params && params.include_officials != null) query.set('include_officials', String(!!params.include_officials)) + const url = '/chat/contacts' + (query.toString() ? `?${query.toString()}` : '') + return await request(url) + } + + const exportChatContacts = async (payload = {}) => { + return await request('/chat/contacts/export', { + method: 'POST', + body: { + account: payload.account || null, + output_dir: payload.output_dir || '', + format: payload.format || 'json', + include_avatar_link: payload.include_avatar_link == null ? true : !!payload.include_avatar_link, + keyword: payload.keyword || null, + contact_types: { + friends: payload?.contact_types?.friends == null ? true : !!payload.contact_types.friends, + groups: payload?.contact_types?.groups == null ? true : !!payload.contact_types.groups, + officials: payload?.contact_types?.officials == null ? true : !!payload.contact_types.officials, + } + } + }) + } + // WeChat Wrapped(年度总结) const getWrappedAnnual = async (params = {}) => { const query = new URLSearchParams() @@ -373,6 +404,8 @@ export const useApi = () => { getChatExport, listChatExports, cancelChatExport, + listChatContacts, + exportChatContacts, getWrappedAnnual, getWrappedAnnualMeta, getWrappedAnnualCard diff --git a/frontend/pages/contacts.vue b/frontend/pages/contacts.vue new file mode 100644 index 0000000..b72d6ea --- /dev/null +++ b/frontend/pages/contacts.vue @@ -0,0 +1,572 @@ + + + + + diff --git a/frontend/pages/sns.vue b/frontend/pages/sns.vue index 5582806..e8ad502 100644 --- a/frontend/pages/sns.vue +++ b/frontend/pages/sns.vue @@ -68,6 +68,26 @@ + +
+
+
+ +
+
+
+
route.path?.startsWith('/chat')) const isSnsRoute = computed(() => route.path?.startsWith('/sns')) +const isContactsRoute = computed(() => route.path?.startsWith('/contacts')) const isWrappedRoute = computed(() => route.path?.startsWith('/wrapped')) // 隐私模式(聊天/朋友圈共用本地开关) @@ -1051,6 +1072,10 @@ const goSns = async () => { await navigateTo('/sns') } +const goContacts = async () => { + await navigateTo('/contacts') +} + const goWrapped = async () => { await navigateTo('/wrapped') } diff --git a/src/wechat_decrypt_tool/api.py b/src/wechat_decrypt_tool/api.py index 5eeb36a..602e72f 100644 --- a/src/wechat_decrypt_tool/api.py +++ b/src/wechat_decrypt_tool/api.py @@ -13,6 +13,7 @@ from .logging_config import setup_logging, get_logger from .path_fix import PathFixRoute from .chat_realtime_autosync import CHAT_REALTIME_AUTOSYNC from .routers.chat import router as _chat_router +from .routers.chat_contacts import router as _chat_contacts_router from .routers.chat_export import router as _chat_export_router from .routers.chat_media import router as _chat_media_router from .routers.decrypt import router as _decrypt_router @@ -52,6 +53,7 @@ app.include_router(_decrypt_router) app.include_router(_keys_router) app.include_router(_media_router) app.include_router(_chat_router) +app.include_router(_chat_contacts_router) app.include_router(_chat_export_router) app.include_router(_chat_media_router) app.include_router(_sns_router) diff --git a/src/wechat_decrypt_tool/routers/chat_contacts.py b/src/wechat_decrypt_tool/routers/chat_contacts.py new file mode 100644 index 0000000..3c94d2a --- /dev/null +++ b/src/wechat_decrypt_tool/routers/chat_contacts.py @@ -0,0 +1,749 @@ +import csv +import json +import re +import sqlite3 +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Literal, Optional + +from fastapi import APIRouter, HTTPException, Request +from pydantic import BaseModel, Field + +from ..chat_helpers import ( + _build_avatar_url, + _pick_avatar_url, + _pick_display_name, + _resolve_account_dir, + _should_keep_session, +) +from ..path_fix import PathFixRoute + +router = APIRouter(route_class=PathFixRoute) + + +_SYSTEM_USERNAMES = { + "filehelper", + "fmessage", + "floatbottle", + "medianote", + "newsapp", + "qmessage", + "qqmail", + "tmessage", + "brandsessionholder", + "brandservicesessionholder", + "notifymessage", + "opencustomerservicemsg", + "notification_messages", + "userexperience_alarm", +} + +_SOURCE_SCENE_LABELS = { + 1: "通过QQ号添加", + 3: "通过微信号添加", + 6: "通过手机号添加", + 10: "通过名片添加", + 14: "通过群聊添加", + 30: "通过扫一扫添加", +} + +_COUNTRY_LABELS = { + "CN": "中国大陆", +} + + +class ContactTypeFilter(BaseModel): + friends: bool = True + groups: bool = True + officials: bool = True + + +class ContactExportRequest(BaseModel): + account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)") + output_dir: str = Field(..., description="导出目录绝对路径") + format: str = Field("json", description="导出格式,仅支持 json/csv") + include_avatar_link: bool = Field(True, description="是否导出 avatarLink 字段") + contact_types: ContactTypeFilter = Field(default_factory=ContactTypeFilter) + keyword: Optional[str] = Field(None, description="关键词筛选(可选)") + + +def _normalize_text(v: Any) -> str: + if v is None: + return "" + return str(v).strip() + + +def _to_int(v: Any) -> int: + try: + return int(v or 0) + except Exception: + return 0 + + +def _to_optional_int(v: Any) -> Optional[int]: + if v is None: + return None + if isinstance(v, bool): + return int(v) + if isinstance(v, int): + return v + s = _normalize_text(v) + if not s: + return None + try: + return int(s) + except Exception: + return None + + +def _decode_varint(raw: bytes, offset: int) -> tuple[Optional[int], int]: + value = 0 + shift = 0 + pos = int(offset) + n = len(raw) + while pos < n: + byte = raw[pos] + pos += 1 + value |= (byte & 0x7F) << shift + if (byte & 0x80) == 0: + return value, pos + shift += 7 + if shift > 63: + return None, n + return None, n + + +def _decode_proto_text(raw: bytes) -> str: + if not raw: + return "" + try: + text = raw.decode("utf-8", errors="ignore") + except Exception: + return "" + return re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text).strip() + + +def _parse_contact_extra_buffer(extra_buffer: Any) -> dict[str, Any]: + out = { + "signature": "", + "country": "", + "province": "", + "city": "", + "source_scene": None, + } + if extra_buffer is None: + return out + + raw: bytes + if isinstance(extra_buffer, memoryview): + raw = extra_buffer.tobytes() + elif isinstance(extra_buffer, (bytes, bytearray)): + raw = bytes(extra_buffer) + else: + return out + + if not raw: + return out + + idx = 0 + n = len(raw) + while idx < n: + tag, idx_next = _decode_varint(raw, idx) + if tag is None: + break + idx = idx_next + field_no = tag >> 3 + wire_type = tag & 0x7 + + if wire_type == 0: + val, idx_next = _decode_varint(raw, idx) + if val is None: + break + idx = idx_next + if field_no == 8: + out["source_scene"] = int(val) + continue + + if wire_type == 2: + size, idx_next = _decode_varint(raw, idx) + if size is None: + break + idx = idx_next + end = idx + int(size) + if end > n: + break + chunk = raw[idx:end] + idx = end + + if field_no in {4, 5, 6, 7}: + text = _decode_proto_text(chunk) + if field_no == 4: + out["signature"] = text + elif field_no == 5: + out["country"] = text + elif field_no == 6: + out["province"] = text + elif field_no == 7: + out["city"] = text + continue + + if wire_type == 1: + idx += 8 + continue + if wire_type == 5: + idx += 4 + continue + + break + + return out + + +def _country_label(country: str) -> str: + c = _normalize_text(country) + if not c: + return "" + return _COUNTRY_LABELS.get(c.upper(), c) + + +def _source_scene_label(source_scene: Optional[int]) -> str: + if source_scene is None: + return "" + if source_scene in _SOURCE_SCENE_LABELS: + return _SOURCE_SCENE_LABELS[source_scene] + return f"场景码 {source_scene}" + + +def _build_region(country: str, province: str, city: str) -> str: + parts: list[str] = [] + country_text = _country_label(country) + province_text = _normalize_text(province) + city_text = _normalize_text(city) + if country_text: + parts.append(country_text) + if province_text: + parts.append(province_text) + if city_text: + parts.append(city_text) + return "·".join(parts) + + +def _safe_export_part(s: str) -> str: + cleaned = re.sub(r"[^0-9A-Za-z._-]+", "_", str(s or "").strip()) + cleaned = cleaned.strip("._-") + return cleaned or "account" + + +def _is_valid_contact_username(username: str) -> bool: + u = _normalize_text(username) + if not u: + return False + if u in _SYSTEM_USERNAMES: + return False + if u.startswith("fake_"): + return False + if not _should_keep_session(u, include_official=True) and not u.startswith("gh_") and u != "weixin": + return False + return True + + +def _get_table_columns(conn: sqlite3.Connection, table: str) -> set[str]: + try: + rows = conn.execute(f"PRAGMA table_info({table})").fetchall() + except Exception: + return set() + + out: set[str] = set() + for row in rows: + try: + name = _normalize_text(row["name"] if "name" in row.keys() else row[1]).lower() + except Exception: + continue + if name: + out.add(name) + return out + + +def _build_contact_select_sql(table: str, columns: set[str]) -> Optional[str]: + if "username" not in columns: + return None + + specs: list[tuple[str, str, str]] = [ + ("username", "username", "''"), + ("remark", "remark", "''"), + ("nick_name", "nick_name", "''"), + ("alias", "alias", "''"), + ("local_type", "local_type", "0"), + ("verify_flag", "verify_flag", "0"), + ("big_head_url", "big_head_url", "''"), + ("small_head_url", "small_head_url", "''"), + ("extra_buffer", "extra_buffer", "x''"), + ] + + select_parts: list[str] = [] + for key, alias, fallback in specs: + if key in columns: + select_parts.append(key) + else: + select_parts.append(f"{fallback} AS {alias}") + return f"SELECT {', '.join(select_parts)} FROM {table}" + + +def _load_contact_rows_map(contact_db_path: Path) -> dict[str, dict[str, Any]]: + out: dict[str, dict[str, Any]] = {} + if not contact_db_path.exists(): + return out + + conn = sqlite3.connect(str(contact_db_path)) + conn.row_factory = sqlite3.Row + try: + def read_rows(table: str) -> list[sqlite3.Row]: + columns = _get_table_columns(conn, table) + sql = _build_contact_select_sql(table, columns) + if not sql: + return [] + try: + return conn.execute(sql).fetchall() + except Exception: + return [] + return [] + + for table in ("contact", "stranger"): + rows = read_rows(table) + for row in rows: + username = _normalize_text(row["username"] if "username" in row.keys() else "") + if (not username) or (username in out): + continue + + extra_info = _parse_contact_extra_buffer( + row["extra_buffer"] if "extra_buffer" in row.keys() else b"" + ) + out[username] = { + "username": username, + "remark": _normalize_text(row["remark"] if "remark" in row.keys() else ""), + "nick_name": _normalize_text(row["nick_name"] if "nick_name" in row.keys() else ""), + "alias": _normalize_text(row["alias"] if "alias" in row.keys() else ""), + "local_type": _to_int(row["local_type"] if "local_type" in row.keys() else 0), + "verify_flag": _to_int(row["verify_flag"] if "verify_flag" in row.keys() else 0), + "big_head_url": _normalize_text(row["big_head_url"] if "big_head_url" in row.keys() else ""), + "small_head_url": _normalize_text(row["small_head_url"] if "small_head_url" in row.keys() else ""), + "country": _normalize_text(extra_info.get("country")), + "province": _normalize_text(extra_info.get("province")), + "city": _normalize_text(extra_info.get("city")), + "source_scene": _to_optional_int(extra_info.get("source_scene")), + } + return out + finally: + conn.close() + + +def _load_session_sort_timestamps(session_db_path: Path) -> dict[str, int]: + out: dict[str, int] = {} + if not session_db_path.exists(): + return out + + conn = sqlite3.connect(str(session_db_path)) + conn.row_factory = sqlite3.Row + try: + rows: list[sqlite3.Row] = [] + queries = [ + "SELECT username, COALESCE(sort_timestamp, 0) AS ts FROM SessionTable", + "SELECT username, COALESCE(last_timestamp, 0) AS ts FROM SessionTable", + ] + for sql in queries: + try: + rows = conn.execute(sql).fetchall() + break + except Exception: + continue + + for row in rows: + username = _normalize_text(row["username"] if "username" in row.keys() else "") + if not username: + continue + ts = _to_int(row["ts"] if "ts" in row.keys() else 0) + prev = out.get(username, 0) + if ts > prev: + out[username] = ts + return out + finally: + conn.close() + + +def _load_session_group_usernames(session_db_path: Path) -> set[str]: + out: set[str] = set() + if not session_db_path.exists(): + return out + + conn = sqlite3.connect(str(session_db_path)) + conn.row_factory = sqlite3.Row + try: + queries = [ + "SELECT username FROM SessionTable", + "SELECT username FROM sessiontable", + ] + for sql in queries: + try: + rows = conn.execute(sql).fetchall() + except Exception: + continue + for row in rows: + username = _normalize_text(row["username"] if "username" in row.keys() else "") + if username and ("@chatroom" in username): + out.add(username) + return out + return out + finally: + conn.close() + + +def _infer_contact_type(username: str, row: dict[str, Any]) -> Optional[str]: + if not username: + return None + + if "@chatroom" in username: + return "group" + + verify_flag = _to_int(row.get("verify_flag")) + if username.startswith("gh_") or verify_flag != 0: + return "official" + + local_type = _to_int(row.get("local_type")) + if local_type == 1: + return "friend" + + return None + + +def _matches_keyword(contact: dict[str, Any], keyword: str) -> bool: + kw = _normalize_text(keyword).lower() + if not kw: + return True + + fields = [ + contact.get("username", ""), + contact.get("displayName", ""), + contact.get("remark", ""), + contact.get("nickname", ""), + contact.get("alias", ""), + contact.get("region", ""), + contact.get("source", ""), + contact.get("country", ""), + contact.get("province", ""), + contact.get("city", ""), + ] + for field in fields: + if kw in _normalize_text(field).lower(): + return True + return False + + +def _collect_contacts_for_account( + *, + account_dir: Path, + base_url: str, + keyword: Optional[str], + include_friends: bool, + include_groups: bool, + include_officials: bool, +) -> list[dict[str, Any]]: + if not (include_friends or include_groups or include_officials): + return [] + + contact_db_path = account_dir / "contact.db" + session_db_path = account_dir / "session.db" + contact_rows = _load_contact_rows_map(contact_db_path) + session_ts_map = _load_session_sort_timestamps(session_db_path) + session_group_usernames = _load_session_group_usernames(session_db_path) + + contacts: list[dict[str, Any]] = [] + for username, row in contact_rows.items(): + if not _is_valid_contact_username(username): + continue + + contact_type = _infer_contact_type(username, row) + if contact_type is None: + continue + if contact_type == "friend" and not include_friends: + continue + if contact_type == "group" and not include_groups: + continue + if contact_type == "official" and not include_officials: + continue + + display_name = _pick_display_name(row, username) + if not display_name: + display_name = username + + avatar_link = _normalize_text(_pick_avatar_url(row) or "") + avatar = base_url + _build_avatar_url(account_dir.name, username) + country = _normalize_text(row.get("country")) + province = _normalize_text(row.get("province")) + city = _normalize_text(row.get("city")) + source_scene = _to_optional_int(row.get("source_scene")) + + item = { + "username": username, + "displayName": display_name, + "remark": _normalize_text(row.get("remark")), + "nickname": _normalize_text(row.get("nick_name")), + "alias": _normalize_text(row.get("alias")), + "type": contact_type, + "country": country, + "province": province, + "city": city, + "region": _build_region(country, province, city), + "sourceScene": source_scene, + "source": _source_scene_label(source_scene), + "avatar": avatar, + "avatarLink": avatar_link, + "_sortTs": _to_int(session_ts_map.get(username, 0)), + } + + if not _matches_keyword(item, keyword or ""): + continue + contacts.append(item) + + if include_groups: + for username in session_group_usernames: + if username in contact_rows: + continue + if not _is_valid_contact_username(username): + continue + + avatar_link = "" + avatar = base_url + _build_avatar_url(account_dir.name, username) + + item = { + "username": username, + "displayName": username, + "remark": "", + "nickname": "", + "alias": "", + "type": "group", + "country": "", + "province": "", + "city": "", + "region": "", + "sourceScene": None, + "source": "", + "avatar": avatar, + "avatarLink": avatar_link, + "_sortTs": _to_int(session_ts_map.get(username, 0)), + } + + if not _matches_keyword(item, keyword or ""): + continue + contacts.append(item) + + contacts.sort( + key=lambda x: ( + -_to_int(x.get("_sortTs", 0)), + _normalize_text(x.get("displayName", "")).lower(), + _normalize_text(x.get("username", "")).lower(), + ) + ) + for item in contacts: + item.pop("_sortTs", None) + return contacts + + +def _build_counts(contacts: list[dict[str, Any]]) -> dict[str, int]: + counts = { + "friends": 0, + "groups": 0, + "officials": 0, + "total": 0, + } + for item in contacts: + t = _normalize_text(item.get("type")) + if t == "friend": + counts["friends"] += 1 + elif t == "group": + counts["groups"] += 1 + elif t == "official": + counts["officials"] += 1 + counts["total"] = len(contacts) + return counts + + +def _build_export_contacts( + contacts: list[dict[str, Any]], + *, + include_avatar_link: bool, +) -> list[dict[str, Any]]: + out: list[dict[str, Any]] = [] + for item in contacts: + row = { + "username": _normalize_text(item.get("username")), + "displayName": _normalize_text(item.get("displayName")), + "remark": _normalize_text(item.get("remark")), + "nickname": _normalize_text(item.get("nickname")), + "alias": _normalize_text(item.get("alias")), + "type": _normalize_text(item.get("type")), + "region": _normalize_text(item.get("region")), + "country": _normalize_text(item.get("country")), + "province": _normalize_text(item.get("province")), + "city": _normalize_text(item.get("city")), + "source": _normalize_text(item.get("source")), + "sourceScene": _to_optional_int(item.get("sourceScene")), + } + if include_avatar_link: + row["avatarLink"] = _normalize_text(item.get("avatarLink")) + out.append(row) + return out + + +def _write_json_export( + output_path: Path, + *, + account: str, + contacts: list[dict[str, Any]], + include_avatar_link: bool, + keyword: str, + contact_types: ContactTypeFilter, +) -> None: + payload = { + "exportedAt": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "account": account, + "count": len(contacts), + "filters": { + "keyword": keyword, + "contactTypes": { + "friends": bool(contact_types.friends), + "groups": bool(contact_types.groups), + "officials": bool(contact_types.officials), + }, + "includeAvatarLink": bool(include_avatar_link), + }, + "contacts": contacts, + } + output_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + +def _write_csv_export( + output_path: Path, + *, + contacts: list[dict[str, Any]], + include_avatar_link: bool, +) -> None: + columns: list[tuple[str, str]] = [ + ("username", "用户名"), + ("displayName", "显示名称"), + ("remark", "备注"), + ("nickname", "昵称"), + ("alias", "微信号"), + ("type", "类型"), + ("region", "地区"), + ("country", "国家/地区码"), + ("province", "省份"), + ("city", "城市"), + ("source", "来源"), + ("sourceScene", "来源场景码"), + ] + if include_avatar_link: + columns.append(("avatarLink", "头像链接")) + + with output_path.open("w", encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + writer.writerow([label for _, label in columns]) + for item in contacts: + writer.writerow([_normalize_text(item.get(key, "")) for key, _ in columns]) + + +@router.get("/api/chat/contacts", summary="获取联系人列表") +def list_chat_contacts( + request: Request, + account: Optional[str] = None, + keyword: Optional[str] = None, + include_friends: bool = True, + include_groups: bool = True, + include_officials: bool = True, +): + account_dir = _resolve_account_dir(account) + base_url = str(request.base_url).rstrip("/") + + contacts = _collect_contacts_for_account( + account_dir=account_dir, + base_url=base_url, + keyword=keyword, + include_friends=bool(include_friends), + include_groups=bool(include_groups), + include_officials=bool(include_officials), + ) + + return { + "status": "success", + "account": account_dir.name, + "total": len(contacts), + "counts": _build_counts(contacts), + "contacts": contacts, + } + + +@router.post("/api/chat/contacts/export", summary="导出联系人") +def export_chat_contacts(request: Request, req: ContactExportRequest): + account_dir = _resolve_account_dir(req.account) + + output_dir_raw = _normalize_text(req.output_dir) + if not output_dir_raw: + raise HTTPException(status_code=400, detail="output_dir is required.") + + output_dir = Path(output_dir_raw).expanduser() + if not output_dir.is_absolute(): + raise HTTPException(status_code=400, detail="output_dir must be an absolute path.") + + try: + output_dir.mkdir(parents=True, exist_ok=True) + except Exception as e: + raise HTTPException(status_code=400, detail=f"Failed to prepare output_dir: {e}") + + base_url = str(request.base_url).rstrip("/") + contacts = _collect_contacts_for_account( + account_dir=account_dir, + base_url=base_url, + keyword=req.keyword, + include_friends=bool(req.contact_types.friends), + include_groups=bool(req.contact_types.groups), + include_officials=bool(req.contact_types.officials), + ) + + export_contacts = _build_export_contacts( + contacts, + include_avatar_link=bool(req.include_avatar_link), + ) + + fmt = _normalize_text(req.format).lower() + if fmt not in {"json", "csv"}: + raise HTTPException(status_code=400, detail="Unsupported format, use 'json' or 'csv'.") + + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + safe_account = _safe_export_part(account_dir.name) + output_path = output_dir / f"contacts_{safe_account}_{ts}.{fmt}" + + try: + if fmt == "json": + _write_json_export( + output_path, + account=account_dir.name, + contacts=export_contacts, + include_avatar_link=bool(req.include_avatar_link), + keyword=_normalize_text(req.keyword), + contact_types=req.contact_types, + ) + else: + _write_csv_export( + output_path, + contacts=export_contacts, + include_avatar_link=bool(req.include_avatar_link), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to export contacts: {e}") + + return { + "status": "success", + "account": account_dir.name, + "format": fmt, + "outputPath": str(output_path), + "count": len(export_contacts), + } diff --git a/tests/test_contact_type_detection.py b/tests/test_contact_type_detection.py new file mode 100644 index 0000000..df38af4 --- /dev/null +++ b/tests/test_contact_type_detection.py @@ -0,0 +1,71 @@ +import sys +import unittest +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class TestContactTypeDetection(unittest.TestCase): + def test_infer_group(self): + from wechat_decrypt_tool.routers.chat_contacts import _infer_contact_type + + row = {"local_type": 0, "alias": "", "remark": "", "nick_name": ""} + self.assertEqual(_infer_contact_type("123@chatroom", row), "group") + + def test_infer_official_by_prefix(self): + from wechat_decrypt_tool.routers.chat_contacts import _infer_contact_type + + row = {"local_type": 0, "verify_flag": 0, "alias": "", "remark": "", "nick_name": ""} + self.assertEqual(_infer_contact_type("gh_xxx", row), "official") + + def test_infer_official_by_verify_flag(self): + from wechat_decrypt_tool.routers.chat_contacts import _infer_contact_type + + row = {"local_type": 1, "verify_flag": 24, "alias": "", "remark": "", "nick_name": ""} + self.assertEqual(_infer_contact_type("wxid_xxx", row), "official") + + def test_infer_none_for_local_type_3_without_verify(self): + from wechat_decrypt_tool.routers.chat_contacts import _infer_contact_type + + row = {"local_type": 3, "verify_flag": 0, "alias": "", "remark": "", "nick_name": "普通联系人"} + self.assertIsNone(_infer_contact_type("wxid_xxx", row)) + + def test_infer_none_from_wxid_alias_when_local_type_not_1(self): + from wechat_decrypt_tool.routers.chat_contacts import _infer_contact_type + + row = {"local_type": 0, "verify_flag": 0, "alias": "wechat_id", "remark": "", "nick_name": ""} + self.assertIsNone(_infer_contact_type("wxid_xxx", row)) + + def test_infer_friend_from_local_type_1(self): + from wechat_decrypt_tool.routers.chat_contacts import _infer_contact_type + + row = {"local_type": 1, "verify_flag": 0, "alias": "", "remark": "", "nick_name": ""} + self.assertEqual(_infer_contact_type("wxid_xxx", row), "friend") + + def test_infer_none_from_local_type_2(self): + from wechat_decrypt_tool.routers.chat_contacts import _infer_contact_type + + row = {"local_type": 2, "verify_flag": 0, "alias": "", "remark": "", "nick_name": ""} + self.assertIsNone(_infer_contact_type("wxid_xxx", row)) + + def test_infer_none_when_empty_type_0(self): + from wechat_decrypt_tool.routers.chat_contacts import _infer_contact_type + + row = {"local_type": 0, "verify_flag": 0, "alias": "", "remark": "", "nick_name": ""} + self.assertIsNone(_infer_contact_type("wxid_xxx", row)) + + def test_valid_contact_username_filters_system_accounts(self): + from wechat_decrypt_tool.routers.chat_contacts import _is_valid_contact_username + + self.assertFalse(_is_valid_contact_username("filehelper")) + self.assertFalse(_is_valid_contact_username("notifymessage")) + self.assertFalse(_is_valid_contact_username("fake_abc")) + self.assertTrue(_is_valid_contact_username("weixin")) + self.assertTrue(_is_valid_contact_username("wxid_abc")) + self.assertTrue(_is_valid_contact_username("123@chatroom")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_contacts_export.py b/tests/test_contacts_export.py new file mode 100644 index 0000000..4b6dcb2 --- /dev/null +++ b/tests/test_contacts_export.py @@ -0,0 +1,546 @@ +import json +import os +import sqlite3 +import sys +import unittest +import importlib +from pathlib import Path +from tempfile import TemporaryDirectory + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class TestContactsExport(unittest.TestCase): + @staticmethod + def _encode_varint(value: int) -> bytes: + v = int(value) + out = bytearray() + while True: + b = v & 0x7F + v >>= 7 + if v: + out.append(b | 0x80) + else: + out.append(b) + break + return bytes(out) + + @classmethod + def _encode_field_len(cls, field_no: int, raw: bytes) -> bytes: + tag = (int(field_no) << 3) | 2 + payload = bytes(raw) + return cls._encode_varint(tag) + cls._encode_varint(len(payload)) + payload + + @classmethod + def _encode_field_varint(cls, field_no: int, value: int) -> bytes: + tag = int(field_no) << 3 + return cls._encode_varint(tag) + cls._encode_varint(int(value)) + + @classmethod + def _build_extra_buffer(cls, *, country: str, province: str, city: str, source_scene: int) -> bytes: + return b"".join( + [ + cls._encode_field_len(5, country.encode("utf-8")), + cls._encode_field_len(6, province.encode("utf-8")), + cls._encode_field_len(7, city.encode("utf-8")), + cls._encode_field_varint(8, source_scene), + ] + ) + + def _seed_contact_db(self, path: Path) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE contact ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT, + extra_buffer BLOB + ) + """ + ) + conn.execute( + """ + CREATE TABLE stranger ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT, + extra_buffer BLOB + ) + """ + ) + + friend_extra_buffer = self._build_extra_buffer( + country="CN", + province="Sichuan", + city="Chengdu", + source_scene=14, + ) + + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "wxid_friend", + "好友备注", + "好友昵称", + "friend_alias", + 1, + 0, + "https://cdn.example.com/friend_big.jpg", + "https://cdn.example.com/friend_small.jpg", + friend_extra_buffer, + ), + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "room@chatroom", + "", + "测试群", + "", + 0, + 0, + "https://cdn.example.com/group_big.jpg", + "", + b"", + ), + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "gh_official", + "", + "公众号", + "", + 4, + 8, + "", + "https://cdn.example.com/official_small.jpg", + b"", + ), + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "wxid_local_type_3", + "", + "不应计入联系人", + "", + 3, + 0, + "", + "", + b"", + ), + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "weixin", + "", + "微信团队", + "", + 1, + 56, + "", + "", + b"", + ), + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "filehelper", + "", + "文件传输助手", + "", + 0, + 0, + "", + "", + b"", + ), + ) + conn.execute( + "INSERT INTO stranger VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "stranger_verified", + "", + "陌生人认证号", + "", + 4, + 24, + "", + "", + b"", + ), + ) + conn.commit() + finally: + conn.close() + + def _seed_session_db(self, path: Path) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE SessionTable ( + username TEXT, + sort_timestamp INTEGER, + last_timestamp INTEGER + ) + """ + ) + conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("room@chatroom", 300, 300)) + conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_friend", 200, 200)) + conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("gh_official", 100, 100)) + conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("missing@chatroom", 250, 250)) + conn.commit() + finally: + conn.close() + + def _seed_contact_db_legacy(self, path: Path) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE contact ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + """ + CREATE TABLE stranger ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ( + "wxid_legacy_friend", + "旧版好友备注", + "旧版好友昵称", + "legacy_friend_alias", + 1, + 0, + "", + "", + ), + ) + conn.commit() + finally: + conn.close() + + def test_export_json_and_csv(self): + from fastapi import FastAPI + from fastapi.testclient import TestClient + + with TemporaryDirectory() as td: + root = Path(td) + account = "wxid_test" + account_dir = root / "output" / "databases" / account + account_dir.mkdir(parents=True, exist_ok=True) + + self._seed_contact_db(account_dir / "contact.db") + self._seed_session_db(account_dir / "session.db") + + prev = None + try: + prev = os.environ.get("WECHAT_TOOL_DATA_DIR") + os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) + + import wechat_decrypt_tool.chat_helpers as chat_helpers + import wechat_decrypt_tool.routers.chat_contacts as chat_contacts + + importlib.reload(chat_helpers) + importlib.reload(chat_contacts) + + app = FastAPI() + app.include_router(chat_contacts.router) + + client = TestClient(app) + + list_resp = client.get( + "/api/chat/contacts", + params={ + "account": account, + "include_friends": True, + "include_groups": True, + "include_officials": True, + }, + ) + self.assertEqual(list_resp.status_code, 200) + list_payload = list_resp.json() + self.assertEqual(list_payload["status"], "success") + self.assertEqual(list_payload["total"], 6) + self.assertEqual(list_payload["counts"]["friends"], 1) + self.assertEqual(list_payload["counts"]["groups"], 2) + self.assertEqual(list_payload["counts"]["officials"], 3) + usernames = {str(x.get("username")) for x in list_payload.get("contacts", [])} + self.assertIn("missing@chatroom", usernames) + self.assertIn("weixin", usernames) + self.assertNotIn("wxid_local_type_3", usernames) + first = list_payload["contacts"][0] + self.assertIn("avatarLink", first) + + friend_contact = next( + (x for x in list_payload.get("contacts", []) if str(x.get("username")) == "wxid_friend"), + {}, + ) + self.assertEqual(friend_contact.get("country"), "CN") + self.assertEqual(friend_contact.get("province"), "Sichuan") + self.assertEqual(friend_contact.get("city"), "Chengdu") + self.assertEqual(friend_contact.get("region"), "中国大陆·Sichuan·Chengdu") + self.assertEqual(friend_contact.get("sourceScene"), 14) + self.assertEqual(friend_contact.get("source"), "通过群聊添加") + + export_dir = root / "exports" + export_dir.mkdir(parents=True, exist_ok=True) + + json_resp = client.post( + "/api/chat/contacts/export", + json={ + "account": account, + "output_dir": str(export_dir), + "format": "json", + "include_avatar_link": True, + "contact_types": { + "friends": True, + "groups": True, + "officials": True, + }, + }, + ) + self.assertEqual(json_resp.status_code, 200) + json_payload = json_resp.json() + self.assertEqual(json_payload["status"], "success") + self.assertEqual(json_payload["count"], 6) + json_path = Path(json_payload["outputPath"]) + self.assertTrue(json_path.exists()) + + data = json.loads(json_path.read_text(encoding="utf-8")) + self.assertEqual(data["count"], 6) + self.assertIn("avatarLink", data["contacts"][0]) + self.assertIn("region", data["contacts"][0]) + self.assertIn("country", data["contacts"][0]) + self.assertIn("province", data["contacts"][0]) + self.assertIn("city", data["contacts"][0]) + self.assertIn("source", data["contacts"][0]) + self.assertIn("sourceScene", data["contacts"][0]) + export_usernames = {str(x.get("username")) for x in data.get("contacts", [])} + self.assertIn("missing@chatroom", export_usernames) + self.assertNotIn("wxid_local_type_3", export_usernames) + + friend_export = next( + (x for x in data.get("contacts", []) if str(x.get("username")) == "wxid_friend"), + {}, + ) + self.assertEqual(friend_export.get("region"), "中国大陆·Sichuan·Chengdu") + self.assertEqual(friend_export.get("sourceScene"), 14) + self.assertEqual(friend_export.get("source"), "通过群聊添加") + + csv_resp = client.post( + "/api/chat/contacts/export", + json={ + "account": account, + "output_dir": str(export_dir), + "format": "csv", + "include_avatar_link": False, + "contact_types": { + "friends": True, + "groups": False, + "officials": False, + }, + }, + ) + self.assertEqual(csv_resp.status_code, 200) + csv_payload = csv_resp.json() + self.assertEqual(csv_payload["count"], 1) + csv_path = Path(csv_payload["outputPath"]) + text = csv_path.read_text(encoding="utf-8-sig") + self.assertIn("用户名,显示名称,备注,昵称,微信号,类型,地区,国家/地区码,省份,城市,来源,来源场景码", text.splitlines()[0]) + self.assertNotIn("头像链接", text.splitlines()[0]) + self.assertIn("wxid_friend", text) + self.assertIn("中国大陆·Sichuan·Chengdu", text) + self.assertIn("通过群聊添加", text) + self.assertIn(",14", text) + self.assertNotIn("wxid_local_type_3", text) + finally: + if prev is None: + os.environ.pop("WECHAT_TOOL_DATA_DIR", None) + else: + os.environ["WECHAT_TOOL_DATA_DIR"] = prev + + def test_export_invalid_format_returns_400(self): + from fastapi import FastAPI + from fastapi.testclient import TestClient + + with TemporaryDirectory() as td: + root = Path(td) + account = "wxid_test" + account_dir = root / "output" / "databases" / account + account_dir.mkdir(parents=True, exist_ok=True) + + self._seed_contact_db(account_dir / "contact.db") + self._seed_session_db(account_dir / "session.db") + + prev = None + try: + prev = os.environ.get("WECHAT_TOOL_DATA_DIR") + os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) + + import wechat_decrypt_tool.chat_helpers as chat_helpers + import wechat_decrypt_tool.routers.chat_contacts as chat_contacts + + importlib.reload(chat_helpers) + importlib.reload(chat_contacts) + + app = FastAPI() + app.include_router(chat_contacts.router) + + client = TestClient(app) + resp = client.post( + "/api/chat/contacts/export", + json={ + "account": account, + "output_dir": str(root / "exports"), + "format": "vcf", + "include_avatar_link": True, + "contact_types": { + "friends": True, + "groups": True, + "officials": True, + }, + }, + ) + self.assertEqual(resp.status_code, 400) + finally: + if prev is None: + os.environ.pop("WECHAT_TOOL_DATA_DIR", None) + else: + os.environ["WECHAT_TOOL_DATA_DIR"] = prev + + def test_missing_contact_db_returns_404(self): + from fastapi import FastAPI + from fastapi.testclient import TestClient + + with TemporaryDirectory() as td: + root = Path(td) + account = "wxid_test" + account_dir = root / "output" / "databases" / account + account_dir.mkdir(parents=True, exist_ok=True) + + # only session.db exists + self._seed_session_db(account_dir / "session.db") + + prev = None + try: + prev = os.environ.get("WECHAT_TOOL_DATA_DIR") + os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) + + import wechat_decrypt_tool.chat_helpers as chat_helpers + import wechat_decrypt_tool.routers.chat_contacts as chat_contacts + + importlib.reload(chat_helpers) + importlib.reload(chat_contacts) + + app = FastAPI() + app.include_router(chat_contacts.router) + client = TestClient(app) + + resp = client.get("/api/chat/contacts", params={"account": account}) + self.assertEqual(resp.status_code, 404) + finally: + if prev is None: + os.environ.pop("WECHAT_TOOL_DATA_DIR", None) + else: + os.environ["WECHAT_TOOL_DATA_DIR"] = prev + + def test_legacy_schema_without_extra_buffer_is_compatible(self): + from fastapi import FastAPI + from fastapi.testclient import TestClient + + with TemporaryDirectory() as td: + root = Path(td) + account = "wxid_legacy" + account_dir = root / "output" / "databases" / account + account_dir.mkdir(parents=True, exist_ok=True) + + self._seed_contact_db_legacy(account_dir / "contact.db") + self._seed_session_db(account_dir / "session.db") + + prev = None + try: + prev = os.environ.get("WECHAT_TOOL_DATA_DIR") + os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) + + import wechat_decrypt_tool.chat_helpers as chat_helpers + import wechat_decrypt_tool.routers.chat_contacts as chat_contacts + + importlib.reload(chat_helpers) + importlib.reload(chat_contacts) + + app = FastAPI() + app.include_router(chat_contacts.router) + client = TestClient(app) + + resp = client.get( + "/api/chat/contacts", + params={ + "account": account, + "include_friends": True, + "include_groups": False, + "include_officials": False, + }, + ) + self.assertEqual(resp.status_code, 200) + payload = resp.json() + self.assertEqual(payload.get("status"), "success") + self.assertEqual(int(payload.get("total", 0)), 1) + + contact = payload.get("contacts", [])[0] + self.assertEqual(contact.get("username"), "wxid_legacy_friend") + self.assertEqual(contact.get("country"), "") + self.assertEqual(contact.get("province"), "") + self.assertEqual(contact.get("city"), "") + self.assertEqual(contact.get("region"), "") + self.assertIsNone(contact.get("sourceScene")) + self.assertEqual(contact.get("source"), "") + finally: + if prev is None: + os.environ.pop("WECHAT_TOOL_DATA_DIR", None) + else: + os.environ["WECHAT_TOOL_DATA_DIR"] = prev + + +if __name__ == "__main__": + unittest.main()