improvement(voice): 语音优先转 MP3,并完善导出/接口兜底

- 新增 _convert_silk_to_browser_audio:优先 MP3(有 ffmpeg),否则 WAV,最后回退 SILK

- /chat/voice 返回浏览器可播格式,并补充 Content-Disposition 文件名后缀

- HTML 导出按实际格式写入语音资源;音频缺失时仍保留语音气泡结构

- 测试覆盖 MP3 优先、缺失音频兜底,并在用例结束 logging.shutdown()
This commit is contained in:
2977094657
2026-03-10 11:50:43 +08:00
Unverified
parent e349e0cfe3
commit 052c6245d1
5 changed files with 330 additions and 50 deletions
+98 -41
View File
@@ -53,7 +53,7 @@ from .chat_helpers import (
) )
from .logging_config import get_logger from .logging_config import get_logger
from .media_helpers import ( from .media_helpers import (
_convert_silk_to_wav, _convert_silk_to_browser_audio,
_detect_image_media_type, _detect_image_media_type,
_fallback_search_media_by_file_id, _fallback_search_media_by_file_id,
_read_and_maybe_decrypt_media, _read_and_maybe_decrypt_media,
@@ -121,9 +121,10 @@ def _resolve_ui_public_dir() -> Optional[Path]:
if ui_dir_env: if ui_dir_env:
candidates.append(Path(ui_dir_env)) candidates.append(Path(ui_dir_env))
# Repo default: `frontend/.output/public` after `npm --prefix frontend run generate`. # Repo defaults: generated Nuxt output or checked-in desktop UI assets.
repo_root = Path(__file__).resolve().parents[2] repo_root = Path(__file__).resolve().parents[2]
candidates.append(repo_root / "frontend" / ".output" / "public") candidates.append(repo_root / "frontend" / ".output" / "public")
candidates.append(repo_root / "desktop" / "resources" / "ui")
for p in candidates: for p in candidates:
try: try:
@@ -622,6 +623,68 @@ body { background: #EDEDED; }
.wce-audio-actions a { font-size: 0.75rem; color: #07c160; text-decoration: none; } .wce-audio-actions a { font-size: 0.75rem; color: #07c160; text-decoration: none; }
.wce-audio-actions a:hover { text-decoration: underline; } .wce-audio-actions a:hover { text-decoration: underline; }
/* Voice message fallback styles (keep close to `frontend/pages/chat/[[username]].vue`). */
.wechat-voice-wrapper { display: flex; width: 100%; position: relative; }
.wechat-voice-bubble {
border-radius: var(--message-radius);
position: relative;
transition: opacity 0.15s ease;
min-width: 80px;
max-width: 200px;
cursor: pointer;
}
.wechat-voice-bubble:hover { opacity: 0.85; }
.wechat-voice-bubble:active { opacity: 0.7; }
.wechat-voice-sent { background: #95EC69; }
.wechat-voice-sent::after {
content: '';
position: absolute;
top: 50%;
right: -4px;
transform: translateY(-50%) rotate(45deg);
width: 10px;
height: 10px;
background: #95EC69;
border-radius: 2px;
}
.wechat-voice-received { background: #fff; }
.wechat-voice-received::before {
content: '';
position: absolute;
top: 50%;
left: -4px;
transform: translateY(-50%) rotate(45deg);
width: 10px;
height: 10px;
background: #fff;
border-radius: 2px;
}
.wechat-voice-content { display: flex; align-items: center; padding: 8px 12px; gap: 8px; }
.wechat-voice-icon { width: 18px; height: 18px; flex-shrink: 0; color: #1a1a1a; }
.wechat-quote-voice-icon { width: 14px; height: 14px; color: inherit; }
.voice-icon-sent { transform: scaleX(-1); }
.wechat-voice-icon.voice-playing .voice-wave-2 { animation: voice-wave-2 1s infinite; }
.wechat-voice-icon.voice-playing .voice-wave-3 { animation: voice-wave-3 1s infinite; }
@keyframes voice-wave-2 {
0%, 33% { opacity: 0; }
34%, 100% { opacity: 1; }
}
@keyframes voice-wave-3 {
0%, 66% { opacity: 0; }
67%, 100% { opacity: 1; }
}
.wechat-voice-duration { font-size: 14px; color: #1a1a1a; }
.wechat-voice-unread {
position: absolute;
top: 50%;
right: -20px;
transform: translateY(-50%);
width: 8px;
height: 8px;
border-radius: 50%;
background: #e75e58;
}
/* Index page helpers. */ /* Index page helpers. */
.wce-index { min-height: 100vh; background: #EDEDED; } .wce-index { min-height: 100vh; background: #EDEDED; }
.wce-index-container { max-width: 880px; margin: 0 auto; padding: 24px; } .wce-index-container { max-width: 880px; margin: 0 auto; padding: 24px; }
@@ -4958,40 +5021,38 @@ def _write_conversation_html(
tw.write(f' <div class="{esc_attr(bubble_base_cls + " " + bubble_dir_cls)}">{render_text_with_emojis(msg.get("content") or "")}</div>\n') tw.write(f' <div class="{esc_attr(bubble_base_cls + " " + bubble_dir_cls)}">{render_text_with_emojis(msg.get("content") or "")}</div>\n')
elif rt == "voice": elif rt == "voice":
voice = offline_path(msg, "voice") voice = offline_path(msg, "voice")
if voice: duration_ms = msg.get("voiceLength")
duration_ms = msg.get("voiceLength") width = get_voice_width(duration_ms)
width = get_voice_width(duration_ms) seconds = get_voice_duration_in_seconds(duration_ms)
seconds = get_voice_duration_in_seconds(duration_ms) voice_dir_cls = "wechat-voice-sent" if is_sent else "wechat-voice-received"
voice_dir_cls = "wechat-voice-sent" if is_sent else "wechat-voice-received" content_dir_cls = " flex-row-reverse" if is_sent else ""
content_dir_cls = " flex-row-reverse" if is_sent else "" icon_dir_cls = "voice-icon-sent" if is_sent else "voice-icon-received"
icon_dir_cls = "voice-icon-sent" if is_sent else "voice-icon-received" voice_id = str(msg.get("id") or "").strip()
voice_id = str(msg.get("id") or "").strip()
tw.write(' <div class="wechat-voice-wrapper">\n') tw.write(' <div class="wechat-voice-wrapper">\n')
tw.write( tw.write(
f' <div class="wechat-voice-bubble msg-radius {esc_attr(voice_dir_cls)}" style="width: {esc_attr(width)}" data-voice-id="{esc_attr(voice_id)}">\n' f' <div class="wechat-voice-bubble msg-radius {esc_attr(voice_dir_cls)}" style="width: {esc_attr(width)}" data-voice-id="{esc_attr(voice_id)}">\n'
) )
tw.write(f' <div class="wechat-voice-content{esc_attr(content_dir_cls)}">\n') tw.write(f' <div class="wechat-voice-content{esc_attr(content_dir_cls)}">\n')
tw.write( tw.write(
f' <svg class="wechat-voice-icon {esc_attr(icon_dir_cls)}" viewBox="0 0 32 32" fill="currentColor">\n' f' <svg class="wechat-voice-icon {esc_attr(icon_dir_cls)}" viewBox="0 0 32 32" fill="currentColor">\n'
) )
tw.write( tw.write(
' <path d="M10.24 11.616l-4.224 4.192 4.224 4.192c1.088-1.056 1.76-2.56 1.76-4.192s-0.672-3.136-1.76-4.192z"></path>\n' ' <path d="M10.24 11.616l-4.224 4.192 4.224 4.192c1.088-1.056 1.76-2.56 1.76-4.192s-0.672-3.136-1.76-4.192z"></path>\n'
) )
tw.write( tw.write(
' <path class="voice-wave-2" d="M15.199 6.721l-1.791 1.76c1.856 1.888 3.008 4.48 3.008 7.328s-1.152 5.44-3.008 7.328l1.791 1.76c2.336-2.304 3.809-5.536 3.809-9.088s-1.473-6.784-3.809-9.088z"></path>\n' ' <path class="voice-wave-2" d="M15.199 6.721l-1.791 1.76c1.856 1.888 3.008 4.48 3.008 7.328s-1.152 5.44-3.008 7.328l1.791 1.76c2.336-2.304 3.809-5.536 3.809-9.088s-1.473-6.784-3.809-9.088z"></path>\n'
) )
tw.write( tw.write(
' <path class="voice-wave-3" d="M20.129 1.793l-1.762 1.76c3.104 3.168 5.025 7.488 5.025 12.256s-1.921 9.088-5.025 12.256l1.762 1.76c3.648-3.616 5.887-8.544 5.887-14.016s-2.239-10.432-5.887-14.016z"></path>\n' ' <path class="voice-wave-3" d="M20.129 1.793l-1.762 1.76c3.104 3.168 5.025 7.488 5.025 12.256s-1.921 9.088-5.025 12.256l1.762 1.76c3.648-3.616 5.887-8.544 5.887-14.016s-2.239-10.432-5.887-14.016z"></path>\n'
) )
tw.write(" </svg>\n") tw.write(" </svg>\n")
tw.write(f' <span class="wechat-voice-duration">{esc_text(seconds)}"</span>\n') tw.write(f' <span class="wechat-voice-duration">{esc_text(seconds)}"</span>\n')
tw.write(" </div>\n") tw.write(" </div>\n")
tw.write(" </div>\n") tw.write(" </div>\n")
if voice:
tw.write(f' <audio src="{esc_attr(voice)}" preload="none" class="hidden"></audio>\n') tw.write(f' <audio src="{esc_attr(voice)}" preload="none" class="hidden"></audio>\n')
tw.write(" </div>\n") tw.write(" </div>\n")
else:
tw.write(f' <div class="{esc_attr(bubble_base_cls + " " + bubble_dir_cls)}">{render_text_with_emojis(msg.get("content") or "")}</div>\n')
elif rt == "file": elif rt == "file":
fsrc = offline_path(msg, "file") fsrc = offline_path(msg, "file")
title = str(msg.get("title") or msg.get("content") or "文件").strip() title = str(msg.get("title") or msg.get("content") or "文件").strip()
@@ -5982,13 +6043,9 @@ def _materialize_voice(
if not isinstance(data, (bytes, bytearray)): if not isinstance(data, (bytes, bytearray)):
data = bytes(data) data = bytes(data)
wav = _convert_silk_to_wav(data) payload, ext, _media_type = _convert_silk_to_browser_audio(data, preferred_format="mp3")
if wav != data and wav[:4] == b"RIFF": if not payload:
ext = "wav" return "", False
payload = wav
else:
ext = "silk"
payload = data
arc = f"media/voices/voice_{int(server_id)}.{ext}" arc = f"media/voices/voice_{int(server_id)}.{ext}"
zf.writestr(arc, payload) zf.writestr(arc, payload)
+108
View File
@@ -1964,6 +1964,114 @@ def _convert_silk_to_wav(silk_data: bytes) -> bytes:
return silk_data return silk_data
def _looks_like_mp3(data: bytes) -> bool:
if not data:
return False
if data.startswith(b"ID3"):
return True
return len(data) >= 2 and data[0] == 0xFF and (data[1] & 0xE0) == 0xE0
@lru_cache(maxsize=1)
def _find_ffmpeg_executable() -> str:
import shutil
env_value = str(os.environ.get("WECHAT_TOOL_FFMPEG") or "").strip()
if env_value:
resolved = shutil.which(env_value)
if resolved:
return resolved
candidate = Path(env_value).expanduser()
if candidate.is_file():
return str(candidate)
return shutil.which("ffmpeg") or ""
def _convert_wav_to_mp3(wav_data: bytes) -> bytes:
import subprocess
import tempfile
if not wav_data or not wav_data.startswith(b"RIFF"):
return b""
ffmpeg_exe = _find_ffmpeg_executable()
if not ffmpeg_exe:
return b""
try:
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
wav_path = tmp_path / "voice.wav"
mp3_path = tmp_path / "voice.mp3"
wav_path.write_bytes(wav_data)
proc = subprocess.run(
[
ffmpeg_exe,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-i",
str(wav_path),
"-vn",
"-codec:a",
"libmp3lame",
"-q:a",
"4",
str(mp3_path),
],
check=False,
capture_output=True,
)
if proc.returncode != 0 or not mp3_path.exists():
err = proc.stderr.decode("utf-8", errors="ignore").strip()
if err:
logger.warning(f"WAV to MP3 conversion failed: {err}")
return b""
mp3_data = mp3_path.read_bytes()
if _looks_like_mp3(mp3_data):
return mp3_data
except Exception as e:
logger.warning(f"WAV to MP3 conversion failed: {e}")
return b""
def _convert_silk_to_browser_audio(
silk_data: bytes,
*,
preferred_format: str = "mp3",
) -> tuple[bytes, str, str]:
"""Convert SILK audio to a browser-friendly format.
Returns `(payload, ext, media_type)`.
Preference order:
1) MP3 if ffmpeg is available
2) WAV if SILK decoding succeeds
3) original SILK bytes as a last-resort fallback
"""
data = bytes(silk_data or b"")
if not data:
return b"", "silk", "audio/silk"
if _looks_like_mp3(data):
return data, "mp3", "audio/mpeg"
wav_data = data if data.startswith(b"RIFF") else _convert_silk_to_wav(data)
if wav_data.startswith(b"RIFF"):
if str(preferred_format or "").strip().lower() == "mp3":
mp3_data = _convert_wav_to_mp3(wav_data)
if mp3_data:
return mp3_data, "mp3", "audio/mpeg"
return wav_data, "wav", "audio/wav"
return data, "silk", "audio/silk"
def _resolve_media_path_for_kind( def _resolve_media_path_for_kind(
account_dir: Path, account_dir: Path,
kind: str, kind: str,
+13 -8
View File
@@ -33,7 +33,7 @@ from ..avatar_cache import (
) )
from ..logging_config import get_logger from ..logging_config import get_logger
from ..media_helpers import ( from ..media_helpers import (
_convert_silk_to_wav, _convert_silk_to_browser_audio,
_decrypt_emoticon_aes_cbc, _decrypt_emoticon_aes_cbc,
_detect_image_extension, _detect_image_extension,
_detect_image_media_type, _detect_image_media_type,
@@ -1762,12 +1762,12 @@ async def get_chat_voice(server_id: int, account: Optional[str] = None):
if not isinstance(data, (bytes, bytearray)): if not isinstance(data, (bytes, bytearray)):
data = bytes(data) data = bytes(data)
# Try to convert SILK to WAV for browser playback payload, ext, media_type = _convert_silk_to_browser_audio(data, preferred_format="mp3")
wav_data = _convert_silk_to_wav(data) if payload and ext != "silk":
if wav_data != data:
return Response( return Response(
content=wav_data, content=payload,
media_type="audio/wav", media_type=media_type,
headers={"Content-Disposition": f"inline; filename=voice_{int(server_id)}.{ext}"},
) )
# Fallback to raw SILK if conversion fails # Fallback to raw SILK if conversion fails
@@ -1821,11 +1821,16 @@ async def open_chat_media_folder(
if not isinstance(data, (bytes, bytearray)): if not isinstance(data, (bytes, bytearray)):
data = bytes(data) data = bytes(data)
payload, ext, _media_type = _convert_silk_to_browser_audio(data, preferred_format="mp3")
if not payload:
payload = data
ext = "silk"
export_dir = account_dir / "_exports" export_dir = account_dir / "_exports"
export_dir.mkdir(parents=True, exist_ok=True) export_dir.mkdir(parents=True, exist_ok=True)
p = export_dir / f"voice_{int(server_id)}.silk" p = export_dir / f"voice_{int(server_id)}.{ext}"
try: try:
p.write_bytes(data) p.write_bytes(payload)
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to export voice: {e}") raise HTTPException(status_code=500, detail=f"Failed to export voice: {e}")
else: else:
+109 -1
View File
@@ -1,6 +1,8 @@
import os import os
import json import json
import hashlib import hashlib
import logging
import re
import sqlite3 import sqlite3
import sys import sys
import unittest import unittest
@@ -243,6 +245,22 @@ class TestChatExportHtmlFormat(unittest.TestCase):
self._seed_media_files(account_dir) self._seed_media_files(account_dir)
return account_dir return account_dir
def _insert_missing_voice_message(self, account_dir: Path, *, username: str, server_id: int, duration_ms: int) -> None:
conn = sqlite3.connect(str(account_dir / "message_0.db"))
try:
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
row = conn.execute(f"SELECT COALESCE(MAX(local_id), 0), COALESCE(MAX(sort_seq), 0) FROM {table_name}").fetchone()
next_local_id = int((row[0] or 0)) + 1
next_sort_seq = int((row[1] or 0)) + 1
voice_xml = f'<msg><voicemsg voicelength="{int(duration_ms)}" /></msg>'
conn.execute(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(next_local_id, int(server_id), 34, next_sort_seq, 2, 1735689700, voice_xml, None),
)
conn.commit()
finally:
conn.close()
def _create_job(self, manager, *, account: str, username: str): def _create_job(self, manager, *, account: str, username: str):
job = manager.create_job( job = manager.create_job(
account=account, account=account,
@@ -283,7 +301,14 @@ class TestChatExportHtmlFormat(unittest.TestCase):
try: try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules() svc = self._reload_export_modules()
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username) original_converter = svc._convert_silk_to_browser_audio
svc._convert_silk_to_browser_audio = (
lambda data, preferred_format="mp3": (bytes(data or b""), "silk", "audio/silk")
)
try:
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
finally:
svc._convert_silk_to_browser_audio = original_converter
self.assertEqual(job.status, "done", msg=job.error) self.assertEqual(job.status, "done", msg=job.error)
self.assertTrue(job.zip_path and job.zip_path.exists()) self.assertTrue(job.zip_path and job.zip_path.exists())
@@ -332,6 +357,8 @@ class TestChatExportHtmlFormat(unittest.TestCase):
css_text = zf.read("assets/wechat-chat-export.css").decode("utf-8", errors="ignore") css_text = zf.read("assets/wechat-chat-export.css").decode("utf-8", errors="ignore")
self.assertIn("wechat-transfer-card", css_text) self.assertIn("wechat-transfer-card", css_text)
self.assertRegex(css_text, re.compile(r"\.wechat-voice-sent(?::|::)after"))
self.assertRegex(css_text, re.compile(r"\.wechat-voice-received(?::|::)before"))
self.assertNotIn("wechat-transfer-card[data-v-", css_text) self.assertNotIn("wechat-transfer-card[data-v-", css_text)
self.assertNotIn("bento-container", css_text) self.assertNotIn("bento-container", css_text)
@@ -346,6 +373,87 @@ class TestChatExportHtmlFormat(unittest.TestCase):
self.assertIn("wxemoji/Expression_1@2x.png", names) self.assertIn("wxemoji/Expression_1@2x.png", names)
self.assertIn("../../wxemoji/Expression_1@2x.png", html_text) self.assertIn("../../wxemoji/Expression_1@2x.png", html_text)
finally: finally:
logging.shutdown()
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
def test_html_export_prefers_mp3_for_voice_assets(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
original_converter = svc._convert_silk_to_browser_audio
svc._convert_silk_to_browser_audio = (
lambda data, preferred_format="mp3": (b"ID3FAKE_MP3_DATA", "mp3", "audio/mpeg")
)
try:
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
finally:
svc._convert_silk_to_browser_audio = original_converter
self.assertEqual(job.status, "done", msg=job.error)
self.assertTrue(job.zip_path and job.zip_path.exists())
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
voice_path = f"media/voices/voice_{self._VOICE_SERVER_ID}.mp3"
self.assertIn(voice_path, names)
self.assertNotIn(f"media/voices/voice_{self._VOICE_SERVER_ID}.wav", names)
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
self.assertIn(f"../../{voice_path}", html_text)
finally:
logging.shutdown()
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
def test_html_export_keeps_voice_bubble_when_audio_file_missing(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
account_dir = self._prepare_account(root, account=account, username=username)
self._insert_missing_voice_message(account_dir, username=username, server_id=999999, duration_ms=6543)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
original_converter = svc._convert_silk_to_browser_audio
svc._convert_silk_to_browser_audio = (
lambda data, preferred_format="mp3": (bytes(data or b""), "silk", "audio/silk")
)
try:
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
finally:
svc._convert_silk_to_browser_audio = original_converter
self.assertEqual(job.status, "done", msg=job.error)
self.assertTrue(job.zip_path and job.zip_path.exists())
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
self.assertIn("wechat-voice-wrapper", html_text)
self.assertIn('data-render-type="voice"', html_text)
self.assertIn('data-voice-id="message_0:msg_d5616d78f22fe35c632f66cabecfc82d:11"', html_text)
self.assertIn('class="wechat-voice-duration">7"</span>', html_text)
finally:
logging.shutdown()
if prev_data is None: if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None) os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else: else:
+2
View File
@@ -1,6 +1,7 @@
import os import os
import json import json
import hashlib import hashlib
import logging
import sqlite3 import sqlite3
import sys import sys
import unittest import unittest
@@ -215,6 +216,7 @@ class TestChatExportHtmlPaging(unittest.TestCase):
page1_text = zf.read(page1_js).decode("utf-8", errors="ignore") page1_text = zf.read(page1_js).decode("utf-8", errors="ignore")
self.assertIn("MSG0001", page1_text) self.assertIn("MSG0001", page1_text)
finally: finally:
logging.shutdown()
if prev_data is None: if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None) os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else: else: