mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
test(sns-media): 补充朋友圈图片缓存优先回归测试
- 覆盖本地缓存命中时不调用远程下载 - 覆盖本地缓存未命中时再回退到远程解密
This commit is contained in:
@@ -14,25 +14,68 @@ from wechat_decrypt_tool.routers import sns # noqa: E402 pylint: disable=wrong
|
|||||||
|
|
||||||
|
|
||||||
class TestSnsMediaRouteWeFlowDefault(unittest.TestCase):
|
class TestSnsMediaRouteWeFlowDefault(unittest.TestCase):
|
||||||
def test_route_stops_after_remote_miss_by_default(self):
|
def test_route_prefers_local_cache_before_remote(self):
|
||||||
with TemporaryDirectory() as td:
|
with TemporaryDirectory() as td:
|
||||||
account_dir = Path(td) / "acc"
|
account_dir = Path(td) / "acc"
|
||||||
account_dir.mkdir(parents=True, exist_ok=True)
|
account_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
local_path = account_dir / "local.jpg"
|
||||||
|
payload = b"\xff\xd8\xff\x00localjpeg"
|
||||||
|
local_path.write_bytes(payload)
|
||||||
|
|
||||||
with mock.patch("wechat_decrypt_tool.routers.sns._resolve_account_dir", return_value=account_dir):
|
with mock.patch("wechat_decrypt_tool.routers.sns._resolve_account_dir", return_value=account_dir):
|
||||||
with mock.patch("wechat_decrypt_tool.routers.sns._try_fetch_and_decrypt_sns_remote", return_value=None):
|
with mock.patch("wechat_decrypt_tool.routers.sns._resolve_account_wxid_dir", return_value=Path(td) / "wxid"):
|
||||||
with self.assertRaises(sns.HTTPException) as ctx:
|
with mock.patch("wechat_decrypt_tool.routers.sns._resolve_sns_cached_image_path", return_value=str(local_path)):
|
||||||
asyncio.run(
|
with mock.patch("wechat_decrypt_tool.routers.sns._read_and_maybe_decrypt_media", return_value=(payload, "image/jpeg")):
|
||||||
sns.get_sns_media(
|
with mock.patch("wechat_decrypt_tool.routers.sns._try_fetch_and_decrypt_sns_remote") as remote:
|
||||||
account="acc",
|
resp = asyncio.run(
|
||||||
url="https://mmsns.qpic.cn/sns/test/0",
|
sns.get_sns_media(
|
||||||
key="123",
|
account="acc",
|
||||||
token="tkn",
|
create_time=1,
|
||||||
use_cache=1,
|
width=1,
|
||||||
)
|
height=1,
|
||||||
)
|
url="https://mmsns.qpic.cn/sns/test/0",
|
||||||
|
key="123",
|
||||||
|
token="tkn",
|
||||||
|
use_cache=1,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
self.assertEqual(ctx.exception.status_code, 404)
|
remote.assert_not_called()
|
||||||
|
self.assertEqual(resp.status_code, 200)
|
||||||
|
self.assertEqual(resp.body, payload)
|
||||||
|
self.assertEqual(resp.headers.get("X-SNS-Source"), "local-cache")
|
||||||
|
|
||||||
|
def test_route_falls_back_to_remote_when_local_cache_misses(self):
|
||||||
|
with TemporaryDirectory() as td:
|
||||||
|
account_dir = Path(td) / "acc"
|
||||||
|
account_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
remote_resp = sns.Response(content=b"remote", media_type="image/jpeg")
|
||||||
|
remote_resp.headers["X-SNS-Source"] = "remote-decrypt"
|
||||||
|
|
||||||
|
with mock.patch("wechat_decrypt_tool.routers.sns._resolve_account_dir", return_value=account_dir):
|
||||||
|
with mock.patch("wechat_decrypt_tool.routers.sns._resolve_account_wxid_dir", return_value=Path(td) / "wxid"):
|
||||||
|
with mock.patch("wechat_decrypt_tool.routers.sns._resolve_sns_cached_image_path", return_value=None):
|
||||||
|
with mock.patch(
|
||||||
|
"wechat_decrypt_tool.routers.sns._try_fetch_and_decrypt_sns_remote",
|
||||||
|
return_value=remote_resp,
|
||||||
|
) as remote:
|
||||||
|
resp = asyncio.run(
|
||||||
|
sns.get_sns_media(
|
||||||
|
account="acc",
|
||||||
|
create_time=1,
|
||||||
|
width=1,
|
||||||
|
height=1,
|
||||||
|
url="https://mmsns.qpic.cn/sns/test/0",
|
||||||
|
key="123",
|
||||||
|
token="tkn",
|
||||||
|
use_cache=1,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
remote.assert_called_once()
|
||||||
|
self.assertEqual(resp.status_code, 200)
|
||||||
|
self.assertEqual(resp.body, b"remote")
|
||||||
|
self.assertEqual(resp.headers.get("X-SNS-Source"), "remote-decrypt")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user