mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-02-19 22:30:49 +08:00
- 解析 extra_buffer 补齐 gender/signature\n- 返回 pinyinKey/pinyinInitial,前端按 A-Z/# 分组排序展示\n- tests: 更新联系人导出用例覆盖新增字段
562 lines
20 KiB
Python
562 lines
20 KiB
Python
import json
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
import unittest
|
|
import importlib
|
|
from pathlib import Path
|
|
from tempfile import TemporaryDirectory
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
sys.path.insert(0, str(ROOT / "src"))
|
|
|
|
|
|
class TestContactsExport(unittest.TestCase):
|
|
@staticmethod
|
|
def _encode_varint(value: int) -> bytes:
|
|
v = int(value)
|
|
out = bytearray()
|
|
while True:
|
|
b = v & 0x7F
|
|
v >>= 7
|
|
if v:
|
|
out.append(b | 0x80)
|
|
else:
|
|
out.append(b)
|
|
break
|
|
return bytes(out)
|
|
|
|
@classmethod
|
|
def _encode_field_len(cls, field_no: int, raw: bytes) -> bytes:
|
|
tag = (int(field_no) << 3) | 2
|
|
payload = bytes(raw)
|
|
return cls._encode_varint(tag) + cls._encode_varint(len(payload)) + payload
|
|
|
|
@classmethod
|
|
def _encode_field_varint(cls, field_no: int, value: int) -> bytes:
|
|
tag = int(field_no) << 3
|
|
return cls._encode_varint(tag) + cls._encode_varint(int(value))
|
|
|
|
@classmethod
|
|
def _build_extra_buffer(
|
|
cls,
|
|
*,
|
|
country: str,
|
|
province: str,
|
|
city: str,
|
|
source_scene: int,
|
|
gender: int = 0,
|
|
signature: str = "",
|
|
) -> bytes:
|
|
return b"".join(
|
|
[
|
|
cls._encode_field_varint(2, gender),
|
|
cls._encode_field_len(4, signature.encode("utf-8")),
|
|
cls._encode_field_len(5, country.encode("utf-8")),
|
|
cls._encode_field_len(6, province.encode("utf-8")),
|
|
cls._encode_field_len(7, city.encode("utf-8")),
|
|
cls._encode_field_varint(8, source_scene),
|
|
]
|
|
)
|
|
|
|
def _seed_contact_db(self, path: Path) -> None:
|
|
conn = sqlite3.connect(str(path))
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE contact (
|
|
username TEXT,
|
|
remark TEXT,
|
|
nick_name TEXT,
|
|
alias TEXT,
|
|
local_type INTEGER,
|
|
verify_flag INTEGER,
|
|
big_head_url TEXT,
|
|
small_head_url TEXT,
|
|
extra_buffer BLOB
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE stranger (
|
|
username TEXT,
|
|
remark TEXT,
|
|
nick_name TEXT,
|
|
alias TEXT,
|
|
local_type INTEGER,
|
|
verify_flag INTEGER,
|
|
big_head_url TEXT,
|
|
small_head_url TEXT,
|
|
extra_buffer BLOB
|
|
)
|
|
"""
|
|
)
|
|
|
|
friend_extra_buffer = self._build_extra_buffer(
|
|
country="CN",
|
|
province="Sichuan",
|
|
city="Chengdu",
|
|
source_scene=14,
|
|
gender=1,
|
|
signature="自助者天助!!!",
|
|
)
|
|
|
|
conn.execute(
|
|
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"wxid_friend",
|
|
"好友备注",
|
|
"好友昵称",
|
|
"friend_alias",
|
|
1,
|
|
0,
|
|
"https://cdn.example.com/friend_big.jpg",
|
|
"https://cdn.example.com/friend_small.jpg",
|
|
friend_extra_buffer,
|
|
),
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"room@chatroom",
|
|
"",
|
|
"测试群",
|
|
"",
|
|
0,
|
|
0,
|
|
"https://cdn.example.com/group_big.jpg",
|
|
"",
|
|
b"",
|
|
),
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"gh_official",
|
|
"",
|
|
"公众号",
|
|
"",
|
|
4,
|
|
8,
|
|
"",
|
|
"https://cdn.example.com/official_small.jpg",
|
|
b"",
|
|
),
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"wxid_local_type_3",
|
|
"",
|
|
"不应计入联系人",
|
|
"",
|
|
3,
|
|
0,
|
|
"",
|
|
"",
|
|
b"",
|
|
),
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"weixin",
|
|
"",
|
|
"微信团队",
|
|
"",
|
|
1,
|
|
56,
|
|
"",
|
|
"",
|
|
b"",
|
|
),
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"filehelper",
|
|
"",
|
|
"文件传输助手",
|
|
"",
|
|
0,
|
|
0,
|
|
"",
|
|
"",
|
|
b"",
|
|
),
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO stranger VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"stranger_verified",
|
|
"",
|
|
"陌生人认证号",
|
|
"",
|
|
4,
|
|
24,
|
|
"",
|
|
"",
|
|
b"",
|
|
),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
def _seed_session_db(self, path: Path) -> None:
|
|
conn = sqlite3.connect(str(path))
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE SessionTable (
|
|
username TEXT,
|
|
sort_timestamp INTEGER,
|
|
last_timestamp INTEGER
|
|
)
|
|
"""
|
|
)
|
|
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("room@chatroom", 300, 300))
|
|
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_friend", 200, 200))
|
|
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("gh_official", 100, 100))
|
|
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("missing@chatroom", 250, 250))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
def _seed_contact_db_legacy(self, path: Path) -> None:
|
|
conn = sqlite3.connect(str(path))
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE contact (
|
|
username TEXT,
|
|
remark TEXT,
|
|
nick_name TEXT,
|
|
alias TEXT,
|
|
local_type INTEGER,
|
|
verify_flag INTEGER,
|
|
big_head_url TEXT,
|
|
small_head_url TEXT
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE stranger (
|
|
username TEXT,
|
|
remark TEXT,
|
|
nick_name TEXT,
|
|
alias TEXT,
|
|
local_type INTEGER,
|
|
verify_flag INTEGER,
|
|
big_head_url TEXT,
|
|
small_head_url TEXT
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"wxid_legacy_friend",
|
|
"旧版好友备注",
|
|
"旧版好友昵称",
|
|
"legacy_friend_alias",
|
|
1,
|
|
0,
|
|
"",
|
|
"",
|
|
),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_export_json_and_csv(self):
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
with TemporaryDirectory() as td:
|
|
root = Path(td)
|
|
account = "wxid_test"
|
|
account_dir = root / "output" / "databases" / account
|
|
account_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self._seed_contact_db(account_dir / "contact.db")
|
|
self._seed_session_db(account_dir / "session.db")
|
|
|
|
prev = None
|
|
try:
|
|
prev = os.environ.get("WECHAT_TOOL_DATA_DIR")
|
|
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
|
|
|
|
import wechat_decrypt_tool.chat_helpers as chat_helpers
|
|
import wechat_decrypt_tool.routers.chat_contacts as chat_contacts
|
|
|
|
importlib.reload(chat_helpers)
|
|
importlib.reload(chat_contacts)
|
|
|
|
app = FastAPI()
|
|
app.include_router(chat_contacts.router)
|
|
|
|
client = TestClient(app)
|
|
|
|
list_resp = client.get(
|
|
"/api/chat/contacts",
|
|
params={
|
|
"account": account,
|
|
"include_friends": True,
|
|
"include_groups": True,
|
|
"include_officials": True,
|
|
},
|
|
)
|
|
self.assertEqual(list_resp.status_code, 200)
|
|
list_payload = list_resp.json()
|
|
self.assertEqual(list_payload["status"], "success")
|
|
self.assertEqual(list_payload["total"], 6)
|
|
self.assertEqual(list_payload["counts"]["friends"], 1)
|
|
self.assertEqual(list_payload["counts"]["groups"], 2)
|
|
self.assertEqual(list_payload["counts"]["officials"], 3)
|
|
usernames = {str(x.get("username")) for x in list_payload.get("contacts", [])}
|
|
self.assertIn("missing@chatroom", usernames)
|
|
self.assertIn("weixin", usernames)
|
|
self.assertNotIn("wxid_local_type_3", usernames)
|
|
first = list_payload["contacts"][0]
|
|
self.assertIn("avatarLink", first)
|
|
|
|
friend_contact = next(
|
|
(x for x in list_payload.get("contacts", []) if str(x.get("username")) == "wxid_friend"),
|
|
{},
|
|
)
|
|
self.assertEqual(friend_contact.get("country"), "CN")
|
|
self.assertEqual(friend_contact.get("province"), "Sichuan")
|
|
self.assertEqual(friend_contact.get("city"), "Chengdu")
|
|
self.assertEqual(friend_contact.get("region"), "中国大陆·Sichuan·Chengdu")
|
|
self.assertEqual(friend_contact.get("gender"), 1)
|
|
self.assertEqual(friend_contact.get("signature"), "自助者天助!!!")
|
|
self.assertEqual(friend_contact.get("sourceScene"), 14)
|
|
self.assertEqual(friend_contact.get("source"), "通过群聊添加")
|
|
|
|
export_dir = root / "exports"
|
|
export_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
json_resp = client.post(
|
|
"/api/chat/contacts/export",
|
|
json={
|
|
"account": account,
|
|
"output_dir": str(export_dir),
|
|
"format": "json",
|
|
"include_avatar_link": True,
|
|
"contact_types": {
|
|
"friends": True,
|
|
"groups": True,
|
|
"officials": True,
|
|
},
|
|
},
|
|
)
|
|
self.assertEqual(json_resp.status_code, 200)
|
|
json_payload = json_resp.json()
|
|
self.assertEqual(json_payload["status"], "success")
|
|
self.assertEqual(json_payload["count"], 6)
|
|
json_path = Path(json_payload["outputPath"])
|
|
self.assertTrue(json_path.exists())
|
|
|
|
data = json.loads(json_path.read_text(encoding="utf-8"))
|
|
self.assertEqual(data["count"], 6)
|
|
self.assertIn("avatarLink", data["contacts"][0])
|
|
self.assertIn("region", data["contacts"][0])
|
|
self.assertIn("country", data["contacts"][0])
|
|
self.assertIn("province", data["contacts"][0])
|
|
self.assertIn("city", data["contacts"][0])
|
|
self.assertIn("source", data["contacts"][0])
|
|
self.assertIn("sourceScene", data["contacts"][0])
|
|
export_usernames = {str(x.get("username")) for x in data.get("contacts", [])}
|
|
self.assertIn("missing@chatroom", export_usernames)
|
|
self.assertNotIn("wxid_local_type_3", export_usernames)
|
|
|
|
friend_export = next(
|
|
(x for x in data.get("contacts", []) if str(x.get("username")) == "wxid_friend"),
|
|
{},
|
|
)
|
|
self.assertEqual(friend_export.get("region"), "中国大陆·Sichuan·Chengdu")
|
|
self.assertEqual(friend_export.get("sourceScene"), 14)
|
|
self.assertEqual(friend_export.get("source"), "通过群聊添加")
|
|
|
|
csv_resp = client.post(
|
|
"/api/chat/contacts/export",
|
|
json={
|
|
"account": account,
|
|
"output_dir": str(export_dir),
|
|
"format": "csv",
|
|
"include_avatar_link": False,
|
|
"contact_types": {
|
|
"friends": True,
|
|
"groups": False,
|
|
"officials": False,
|
|
},
|
|
},
|
|
)
|
|
self.assertEqual(csv_resp.status_code, 200)
|
|
csv_payload = csv_resp.json()
|
|
self.assertEqual(csv_payload["count"], 1)
|
|
csv_path = Path(csv_payload["outputPath"])
|
|
text = csv_path.read_text(encoding="utf-8-sig")
|
|
self.assertIn("用户名,显示名称,备注,昵称,微信号,类型,地区,国家/地区码,省份,城市,来源,来源场景码", text.splitlines()[0])
|
|
self.assertNotIn("头像链接", text.splitlines()[0])
|
|
self.assertIn("wxid_friend", text)
|
|
self.assertIn("中国大陆·Sichuan·Chengdu", text)
|
|
self.assertIn("通过群聊添加", text)
|
|
self.assertIn(",14", text)
|
|
self.assertNotIn("wxid_local_type_3", text)
|
|
finally:
|
|
if prev is None:
|
|
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
|
|
else:
|
|
os.environ["WECHAT_TOOL_DATA_DIR"] = prev
|
|
|
|
def test_export_invalid_format_returns_400(self):
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
with TemporaryDirectory() as td:
|
|
root = Path(td)
|
|
account = "wxid_test"
|
|
account_dir = root / "output" / "databases" / account
|
|
account_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self._seed_contact_db(account_dir / "contact.db")
|
|
self._seed_session_db(account_dir / "session.db")
|
|
|
|
prev = None
|
|
try:
|
|
prev = os.environ.get("WECHAT_TOOL_DATA_DIR")
|
|
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
|
|
|
|
import wechat_decrypt_tool.chat_helpers as chat_helpers
|
|
import wechat_decrypt_tool.routers.chat_contacts as chat_contacts
|
|
|
|
importlib.reload(chat_helpers)
|
|
importlib.reload(chat_contacts)
|
|
|
|
app = FastAPI()
|
|
app.include_router(chat_contacts.router)
|
|
|
|
client = TestClient(app)
|
|
resp = client.post(
|
|
"/api/chat/contacts/export",
|
|
json={
|
|
"account": account,
|
|
"output_dir": str(root / "exports"),
|
|
"format": "vcf",
|
|
"include_avatar_link": True,
|
|
"contact_types": {
|
|
"friends": True,
|
|
"groups": True,
|
|
"officials": True,
|
|
},
|
|
},
|
|
)
|
|
self.assertEqual(resp.status_code, 400)
|
|
finally:
|
|
if prev is None:
|
|
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
|
|
else:
|
|
os.environ["WECHAT_TOOL_DATA_DIR"] = prev
|
|
|
|
def test_missing_contact_db_returns_404(self):
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
with TemporaryDirectory() as td:
|
|
root = Path(td)
|
|
account = "wxid_test"
|
|
account_dir = root / "output" / "databases" / account
|
|
account_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# only session.db exists
|
|
self._seed_session_db(account_dir / "session.db")
|
|
|
|
prev = None
|
|
try:
|
|
prev = os.environ.get("WECHAT_TOOL_DATA_DIR")
|
|
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
|
|
|
|
import wechat_decrypt_tool.chat_helpers as chat_helpers
|
|
import wechat_decrypt_tool.routers.chat_contacts as chat_contacts
|
|
|
|
importlib.reload(chat_helpers)
|
|
importlib.reload(chat_contacts)
|
|
|
|
app = FastAPI()
|
|
app.include_router(chat_contacts.router)
|
|
client = TestClient(app)
|
|
|
|
resp = client.get("/api/chat/contacts", params={"account": account})
|
|
self.assertEqual(resp.status_code, 404)
|
|
finally:
|
|
if prev is None:
|
|
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
|
|
else:
|
|
os.environ["WECHAT_TOOL_DATA_DIR"] = prev
|
|
|
|
def test_legacy_schema_without_extra_buffer_is_compatible(self):
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
with TemporaryDirectory() as td:
|
|
root = Path(td)
|
|
account = "wxid_legacy"
|
|
account_dir = root / "output" / "databases" / account
|
|
account_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self._seed_contact_db_legacy(account_dir / "contact.db")
|
|
self._seed_session_db(account_dir / "session.db")
|
|
|
|
prev = None
|
|
try:
|
|
prev = os.environ.get("WECHAT_TOOL_DATA_DIR")
|
|
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
|
|
|
|
import wechat_decrypt_tool.chat_helpers as chat_helpers
|
|
import wechat_decrypt_tool.routers.chat_contacts as chat_contacts
|
|
|
|
importlib.reload(chat_helpers)
|
|
importlib.reload(chat_contacts)
|
|
|
|
app = FastAPI()
|
|
app.include_router(chat_contacts.router)
|
|
client = TestClient(app)
|
|
|
|
resp = client.get(
|
|
"/api/chat/contacts",
|
|
params={
|
|
"account": account,
|
|
"include_friends": True,
|
|
"include_groups": False,
|
|
"include_officials": False,
|
|
},
|
|
)
|
|
self.assertEqual(resp.status_code, 200)
|
|
payload = resp.json()
|
|
self.assertEqual(payload.get("status"), "success")
|
|
self.assertEqual(int(payload.get("total", 0)), 1)
|
|
|
|
contact = payload.get("contacts", [])[0]
|
|
self.assertEqual(contact.get("username"), "wxid_legacy_friend")
|
|
self.assertEqual(contact.get("country"), "")
|
|
self.assertEqual(contact.get("province"), "")
|
|
self.assertEqual(contact.get("city"), "")
|
|
self.assertEqual(contact.get("region"), "")
|
|
self.assertIsNone(contact.get("sourceScene"))
|
|
self.assertEqual(contact.get("source"), "")
|
|
finally:
|
|
if prev is None:
|
|
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
|
|
else:
|
|
os.environ["WECHAT_TOOL_DATA_DIR"] = prev
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|