mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
23932bf89c
- 解密页新增表情下载步骤,支持开始/停止、进度展示和结果统计\n- 图片解密与表情下载接口支持并发配置,补充 SSE 进度与结果信息\n- 增加表情目录聚合、缓存校验与媒体下载相关测试
135 lines
6.9 KiB
Python
135 lines
6.9 KiB
Python
import asyncio
|
|
import json
|
|
import sys
|
|
import unittest
|
|
from pathlib import Path
|
|
from tempfile import TemporaryDirectory
|
|
from unittest import mock
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
sys.path.insert(0, str(ROOT / "src"))
|
|
|
|
|
|
from wechat_decrypt_tool.routers import media as media_router # noqa: E402 pylint: disable=wrong-import-position
|
|
|
|
|
|
class _FakeDisconnectingRequest:
|
|
def __init__(self, disconnect_after: int):
|
|
self._disconnect_after = disconnect_after
|
|
self._calls = 0
|
|
|
|
async def is_disconnected(self):
|
|
self._calls += 1
|
|
return self._calls >= self._disconnect_after
|
|
|
|
|
|
async def _read_sse_events(response) -> list[dict]:
|
|
chunks = []
|
|
async for chunk in response.body_iterator:
|
|
chunks.append(chunk.decode("utf-8") if isinstance(chunk, bytes) else str(chunk))
|
|
|
|
events = []
|
|
for chunk in chunks:
|
|
for line in chunk.splitlines():
|
|
if line.startswith("data: "):
|
|
events.append(json.loads(line[len("data: ") :]))
|
|
return events
|
|
|
|
|
|
class TestMediaDecryptStreamCancel(unittest.TestCase):
|
|
def test_stream_uses_default_concurrency(self):
|
|
with TemporaryDirectory() as td:
|
|
root = Path(td)
|
|
account_dir = root / "account"
|
|
wxid_dir = root / "wxid"
|
|
dat_path = wxid_dir / "image.dat"
|
|
resource_dir = account_dir / "resource"
|
|
wxid_dir.mkdir(parents=True, exist_ok=True)
|
|
dat_path.write_bytes(b"encrypted")
|
|
|
|
with mock.patch.object(media_router, "_resolve_account_dir", return_value=account_dir):
|
|
with mock.patch.object(media_router, "_resolve_account_wxid_dir", return_value=wxid_dir):
|
|
with mock.patch.object(media_router, "_load_media_keys", return_value={"xor": 0xA5, "aes": ""}):
|
|
with mock.patch.object(media_router, "_collect_all_dat_files", return_value=[(dat_path, "abc123")]):
|
|
with mock.patch.object(media_router, "_get_resource_dir", return_value=resource_dir):
|
|
with mock.patch.object(media_router, "_try_find_decrypted_resource", return_value=None):
|
|
with mock.patch.object(media_router, "_decrypt_and_save_resource", return_value=(True, "ok")):
|
|
response = asyncio.run(
|
|
media_router.decrypt_all_media_stream(
|
|
request=_FakeDisconnectingRequest(disconnect_after=999),
|
|
account="wxid_demo",
|
|
)
|
|
)
|
|
events = asyncio.run(_read_sse_events(response))
|
|
|
|
self.assertEqual([event.get("type") for event in events], ["scanning", "start", "progress", "complete"])
|
|
self.assertEqual(events[1].get("concurrency"), 10)
|
|
self.assertEqual(events[2].get("concurrency"), 10)
|
|
self.assertEqual(events[3].get("concurrency"), 10)
|
|
|
|
def test_stream_uses_requested_concurrency(self):
|
|
with TemporaryDirectory() as td:
|
|
root = Path(td)
|
|
account_dir = root / "account"
|
|
wxid_dir = root / "wxid"
|
|
dat_path = wxid_dir / "image.dat"
|
|
resource_dir = account_dir / "resource"
|
|
wxid_dir.mkdir(parents=True, exist_ok=True)
|
|
dat_path.write_bytes(b"encrypted")
|
|
|
|
with mock.patch.object(media_router, "_resolve_account_dir", return_value=account_dir):
|
|
with mock.patch.object(media_router, "_resolve_account_wxid_dir", return_value=wxid_dir):
|
|
with mock.patch.object(media_router, "_load_media_keys", return_value={"xor": 0xA5, "aes": ""}):
|
|
with mock.patch.object(media_router, "_collect_all_dat_files", return_value=[(dat_path, "abc123")]):
|
|
with mock.patch.object(media_router, "_get_resource_dir", return_value=resource_dir):
|
|
with mock.patch.object(media_router, "_try_find_decrypted_resource", return_value=None):
|
|
with mock.patch.object(media_router, "_decrypt_and_save_resource", return_value=(True, "ok")):
|
|
response = asyncio.run(
|
|
media_router.decrypt_all_media_stream(
|
|
request=_FakeDisconnectingRequest(disconnect_after=999),
|
|
account="wxid_demo",
|
|
concurrency=7,
|
|
)
|
|
)
|
|
events = asyncio.run(_read_sse_events(response))
|
|
|
|
self.assertEqual(events[1].get("concurrency"), 7)
|
|
self.assertEqual(events[2].get("concurrency"), 7)
|
|
self.assertEqual(events[3].get("concurrency"), 7)
|
|
|
|
def test_stream_stops_processing_when_client_disconnects(self):
|
|
with TemporaryDirectory() as td:
|
|
root = Path(td)
|
|
account_dir = root / "account"
|
|
wxid_dir = root / "wxid"
|
|
dat_path = wxid_dir / "image.dat"
|
|
resource_dir = account_dir / "resource"
|
|
wxid_dir.mkdir(parents=True, exist_ok=True)
|
|
dat_path.write_bytes(b"encrypted")
|
|
|
|
request = _FakeDisconnectingRequest(disconnect_after=3)
|
|
decrypt_mock = mock.Mock(return_value=(True, "ok"))
|
|
|
|
with mock.patch.object(media_router, "_resolve_account_dir", return_value=account_dir):
|
|
with mock.patch.object(media_router, "_resolve_account_wxid_dir", return_value=wxid_dir):
|
|
with mock.patch.object(media_router, "_load_media_keys", return_value={"xor": 0xA5, "aes": ""}):
|
|
with mock.patch.object(media_router, "_collect_all_dat_files", return_value=[(dat_path, "abc123")]):
|
|
with mock.patch.object(media_router, "_get_resource_dir", return_value=resource_dir):
|
|
with mock.patch.object(media_router, "_try_find_decrypted_resource", return_value=None):
|
|
with mock.patch.object(media_router, "_decrypt_and_save_resource", decrypt_mock):
|
|
response = asyncio.run(
|
|
media_router.decrypt_all_media_stream(
|
|
request=request,
|
|
account="wxid_demo",
|
|
)
|
|
)
|
|
events = asyncio.run(_read_sse_events(response))
|
|
|
|
self.assertEqual([event.get("type") for event in events], ["scanning", "start"])
|
|
decrypt_mock.assert_not_called()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|