diff --git a/frontend/components/chat/ChatOverlays.vue b/frontend/components/chat/ChatOverlays.vue index 06008fe..e64d7ec 100644 --- a/frontend/components/chat/ChatOverlays.vue +++ b/frontend/components/chat/ChatOverlays.vue @@ -1440,39 +1440,12 @@
消息类型(导出内容)
-
- - - - -
已选 {{ exportMessageTypes.length }} 项
-
-
- @@ -1512,7 +1485,7 @@ v-if="exportFolder" type="button" class="text-sm px-3 py-2 rounded-md bg-white border border-gray-200 hover:bg-gray-50" - @click="exportFolder = ''; exportFolderHandle = null; exportSaveMsg = ''" + @click="clearExportFolderSelection" > 清空 @@ -1563,34 +1536,32 @@
消息:{{ exportJob.progress?.messagesExported || 0 }};媒体:{{ exportJob.progress?.mediaCopied || 0 }};缺失:{{ exportJob.progress?.mediaMissing || 0 }}
-
- +
+
+ 实际生成位置: +
{{ exportBackendZipPath || '未生成' }}
+
+
+ 浏览器目录: +
{{ exportFolder || '未选择' }}
+
+
{{ exportSaveProgressText }}
+
{{ exportSaveMsg }}
+
{{ exportSaveError }}
+
+ 浏览器模式通常会在写入完成后才显示文件,且出于安全限制,这里只能显示目录名,不能显示完整磁盘路径。 +
+
+ +
下载 ZIP -
-
{{ exportSaveMsg }}
{{ exportJob.error || '导出失败' }} @@ -1603,6 +1574,7 @@ 关闭 +
diff --git a/frontend/composables/chat/useChatExport.js b/frontend/composables/chat/useChatExport.js index 5843f76..148437d 100644 --- a/frontend/composables/chat/useChatExport.js +++ b/frontend/composables/chat/useChatExport.js @@ -35,7 +35,12 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte const exportFolderHandle = ref(null) const exportSaveBusy = ref(false) const exportSaveMsg = ref('') + const exportSaveError = ref('') + const exportSaveState = ref('idle') + const exportSaveBytesWritten = ref(0) + const exportSaveBytesTotal = ref(0) const exportAutoSavedFor = ref('') + const exportCancelRequested = ref(false) const exportSearchQuery = ref('') const exportListTab = ref('all') @@ -50,6 +55,27 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte const next = Number(value) return Number.isFinite(next) ? next : 0 } + const formatBytes = (value) => { + const bytes = Number(value) + if (!Number.isFinite(bytes) || bytes <= 0) return '0 B' + const units = ['B', 'KB', 'MB', 'GB', 'TB'] + let size = bytes + let index = 0 + while (size >= 1024 && index < units.length - 1) { + size /= 1024 + index += 1 + } + const digits = size >= 100 || index === 0 ? 0 : size >= 10 ? 1 : 2 + return `${size.toFixed(digits)} ${units[index]}` + } + const resetExportSaveFeedback = ({ resetAutoSavedFor = false } = {}) => { + exportSaveMsg.value = '' + exportSaveError.value = '' + exportSaveState.value = 'idle' + exportSaveBytesWritten.value = 0 + exportSaveBytesTotal.value = 0 + if (resetAutoSavedFor) exportAutoSavedFor.value = '' + } const exportOverallPercent = computed(() => { const job = exportJob.value @@ -72,6 +98,17 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte if (total <= 0) return null return Math.round(clamp01(done / total) * 100) }) + const exportBackendZipPath = computed(() => { + return String(exportJob.value?.zipPath || '').trim() + }) + const exportSaveProgressText = computed(() => { + if (exportSaveState.value !== 'saving') return '' + const fileName = guessExportZipName(exportJob.value) + if (exportSaveBytesTotal.value > 0) { + return `正在保存到浏览器目录:${fileName}(${formatBytes(exportSaveBytesWritten.value)} / ${formatBytes(exportSaveBytesTotal.value)})` + } + return `正在保存到浏览器目录:${fileName}(${formatBytes(exportSaveBytesWritten.value)})` + }) const normalizeExportSelectedUsernames = (list) => { const seen = new Set() @@ -179,7 +216,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte const chooseExportFolder = async () => { exportError.value = '' - exportSaveMsg.value = '' + resetExportSaveFeedback() try { if (!process.client) { exportError.value = '当前环境不支持选择导出目录' @@ -206,6 +243,10 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte exportError.value = '当前浏览器不支持目录选择,请使用桌面端或 Chromium 新版浏览器' } catch (error) { + const message = String(error?.message || '').trim() + if (error?.name === 'AbortError' || message.includes('The user aborted a request')) { + return + } exportError.value = error?.message || '选择导出目录失败' } } @@ -227,7 +268,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte const saveExportToSelectedFolder = async (options = {}) => { const autoSave = !!options?.auto exportError.value = '' - exportSaveMsg.value = '' + resetExportSaveFeedback() if (!process.client || !isWebDirectoryPickerSupported()) { exportError.value = '当前环境不支持保存到浏览器目录' return @@ -245,6 +286,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte } exportSaveBusy.value = true + exportSaveState.value = 'saving' try { const response = await fetch(getExportDownloadUrl(exportId)) if (!response.ok) { @@ -256,18 +298,46 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte }) throw new Error(`下载导出文件失败(${response.status})`) } - const blob = await response.blob() + exportSaveBytesTotal.value = asNumber(response.headers.get('Content-Length')) const fileName = guessExportZipName(exportJob.value) const fileHandle = await handle.getFileHandle(fileName, { create: true }) const writable = await fileHandle.createWritable() - await writable.write(blob) - await writable.close() + if (response.body && typeof response.body.getReader === 'function') { + const reader = response.body.getReader() + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + if (!value || !value.byteLength) continue + await writable.write(value) + exportSaveBytesWritten.value += value.byteLength + } + await writable.close() + } catch (error) { + try { + await reader.cancel() + } catch {} + try { + await writable.abort() + } catch {} + throw error + } + } else { + const blob = await response.blob() + exportSaveBytesWritten.value = asNumber(blob.size) + if (exportSaveBytesTotal.value <= 0) exportSaveBytesTotal.value = exportSaveBytesWritten.value + await writable.write(blob) + await writable.close() + } exportAutoSavedFor.value = String(exportId) + exportSaveState.value = 'success' + const folderLabel = String(exportFolder.value || '').trim() || '已选目录' exportSaveMsg.value = autoSave - ? `已自动保存到已选目录:${fileName}` - : `已保存到已选目录:${fileName}` + ? `浏览器目录自动保存成功:${fileName}\n位置:${folderLabel}` + : `浏览器目录保存成功:${fileName}\n位置:${folderLabel}` } catch (error) { - exportError.value = error?.message || '保存到浏览器目录失败' + exportSaveState.value = 'error' + exportSaveError.value = `浏览器目录保存失败:${error?.message || '未知错误'}` } finally { exportSaveBusy.value = false } @@ -337,7 +407,8 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte const openExportModal = () => { exportModalOpen.value = true exportError.value = '' - exportSaveMsg.value = '' + resetExportSaveFeedback({ resetAutoSavedFor: true }) + exportCancelRequested.value = false exportSearchQuery.value = '' exportListTab.value = 'all' exportSelectedUsernames.value = [] @@ -356,6 +427,12 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte exportError.value = '' } + const clearExportFolderSelection = () => { + exportFolder.value = '' + exportFolderHandle.value = null + resetExportSaveFeedback({ resetAutoSavedFor: true }) + } + watch(exportModalOpen, (open) => { if (!process.client) return if (!open) { @@ -382,6 +459,9 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte status: String(exportJob.value?.status || '') }), async ({ exportId, status }) => { + if (status !== 'queued' && status !== 'running') { + exportCancelRequested.value = false + } if (!process.client || status !== 'done' || !exportId) return if (!hasWebExportFolder.value) return if (exportAutoSavedFor.value === exportId) return @@ -392,7 +472,8 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte const startChatExport = async () => { exportError.value = '' - exportSaveMsg.value = '' + resetExportSaveFeedback({ resetAutoSavedFor: true }) + exportCancelRequested.value = false if (!selectedAccount.value) { exportError.value = '未选择账号' return @@ -490,13 +571,17 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte const cancelCurrentExport = async () => { const exportId = exportJob.value?.exportId - if (!exportId) return + const status = String(exportJob.value?.status || '') + if (!exportId || (status !== 'queued' && status !== 'running') || exportCancelRequested.value) return + exportError.value = '' + exportCancelRequested.value = true try { await api.cancelChatExport(exportId) const response = await api.getChatExport(exportId) exportJob.value = response?.job || exportJob.value } catch (error) { + exportCancelRequested.value = false exportError.value = error?.message || '取消导出失败' } } @@ -518,7 +603,12 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte exportFolderHandle, exportSaveBusy, exportSaveMsg, + exportSaveError, + exportSaveState, + exportSaveProgressText, + exportBackendZipPath, exportAutoSavedFor, + exportCancelRequested, exportSearchQuery, exportListTab, exportSelectedUsernames, @@ -532,6 +622,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte isExportContactSelected, hasWebExportFolder, chooseExportFolder, + clearExportFolderSelection, getExportDownloadUrl, saveExportToSelectedFolder, openExportModal, diff --git a/src/wechat_decrypt_tool/chat_export_service.py b/src/wechat_decrypt_tool/chat_export_service.py index 3f721e3..ed9ce3d 100644 --- a/src/wechat_decrypt_tool/chat_export_service.py +++ b/src/wechat_decrypt_tool/chat_export_service.py @@ -51,8 +51,10 @@ from .chat_helpers import ( _should_keep_session, _split_group_sender_prefix, ) +from .chat_realtime_autosync import CHAT_REALTIME_AUTOSYNC from .logging_config import get_logger from .media_helpers import ( + MediaPathIndex, _convert_silk_to_browser_audio, _detect_image_media_type, _fallback_search_media_by_file_id, @@ -62,6 +64,7 @@ from .media_helpers import ( _resolve_media_path_for_kind, _try_find_decrypted_resource, ) +from .perf_trace import create_perf_trace logger = get_logger(__name__) @@ -70,6 +73,88 @@ ExportScope = Literal["selected", "all", "groups", "singles"] ExportStatus = Literal["queued", "running", "done", "error", "cancelled"] MediaKind = Literal["image", "emoji", "video", "video_thumb", "voice", "file"] +_EXPORT_PROGRESS_LOG_INTERVAL = 1000 +_EXPORT_SLOW_STEP_MS = 500.0 + + +def _elapsed_ms(started_at: float) -> float: + return round((time.perf_counter() - started_at) * 1000.0, 1) + + +def _safe_json_dumps(value: Any) -> str: + try: + return json.dumps(value, ensure_ascii=False, default=str) + except Exception: + return str(value) + + +def _safe_trace(trace_log: Optional[Callable[..., None]], phase: str, **fields: Any) -> None: + if trace_log is None: + return + try: + trace_log(phase, **fields) + except Exception: + pass + + +def _log_export_slow_step(stage: str, started_at: float, **fields: Any) -> None: + elapsed = _elapsed_ms(started_at) + if elapsed < _EXPORT_SLOW_STEP_MS: + return + payload = { + **fields, + "stage": stage, + "elapsedMs": elapsed, + "thread": threading.current_thread().name, + } + logger.info("chat export slow step %s", _safe_json_dumps(payload)) + + +def _raise_if_job_cancelled( + job: Any, + stage: str, + trace_log: Optional[Callable[..., None]] = None, + **fields: Any, +) -> None: + if not bool(getattr(job, "cancel_requested", False)): + return + export_id = str(getattr(job, "export_id", "") or "") + payload = { + **fields, + "exportId": export_id, + "stage": stage, + "thread": threading.current_thread().name, + } + _safe_trace(trace_log, "cancel_detected", **payload) + logger.info("chat export cancel detected %s", _safe_json_dumps(payload)) + raise _JobCancelled() + + +def _log_writer_progress( + trace_log: Optional[Callable[..., None]], + *, + export_format: str, + job: Any, + conv_username: str, + scanned: int, + exported: int, + force: bool = False, +) -> None: + if not force and (scanned <= 0 or scanned % _EXPORT_PROGRESS_LOG_INTERVAL != 0): + return + progress = getattr(job, "progress", None) + _safe_trace( + trace_log, + "writer_progress", + format=export_format, + conversation=conv_username, + scanned=scanned, + exported=exported, + messagesExported=int(getattr(progress, "messages_exported", 0) or 0), + mediaCopied=int(getattr(progress, "media_copied", 0) or 0), + mediaMissing=int(getattr(progress, "media_missing", 0) or 0), + ) + def _now_iso() -> str: return datetime.now().isoformat(timespec="seconds") @@ -343,6 +428,7 @@ _REMOTE_IMAGE_MAX_BYTES = 5 * 1024 * 1024 _REMOTE_IMAGE_TIMEOUT = (5, 10) _REMOTE_IMAGE_ALLOWED_CT: dict[str, str] = { "image/jpeg": "jpg", + "image/jpg": "jpg", "image/png": "png", "image/gif": "gif", "image/webp": "webp", @@ -394,6 +480,7 @@ def _download_remote_image_to_zip( remote_written: dict[str, str], report: dict[str, Any], ) -> str: + started_at = time.perf_counter() raw = str(url or "").strip() if not raw: return "" @@ -485,6 +572,15 @@ def _download_remote_image_to_zip( arc = f"media/remote/{h[:32]}.{ext}" zf.writestr(arc, bytes(buf)) remote_written[raw] = arc + _log_export_slow_step( + "download_remote_image", + started_at, + url=raw, + finalUrl=current, + arc=arc, + contentType=ct, + bytes=len(buf), + ) return arc except Exception as e: last_error = f"request failed: {e}" @@ -502,6 +598,13 @@ def _download_remote_image_to_zip( except Exception: pass remote_written[raw] = "" + _log_export_slow_step( + "download_remote_image_failed", + started_at, + url=raw, + finalUrl=current, + error=last_error, + ) return "" @@ -2476,11 +2579,32 @@ class ChatExportManager: with self._lock: job = self._jobs.get(export_id) if not job: + logger.info("chat export cancel requested for missing job export_id=%s", export_id) return False job.cancel_requested = True + logger.info( + "chat export cancel requested %s", + _safe_json_dumps( + { + "exportId": job.export_id, + "status": job.status, + "createdAt": job.created_at, + "startedAt": job.started_at, + "progress": { + "conversationsDone": job.progress.conversations_done, + "conversationsTotal": job.progress.conversations_total, + "currentConversationIndex": job.progress.current_conversation_index, + "messagesExported": job.progress.messages_exported, + "mediaCopied": job.progress.media_copied, + "mediaMissing": job.progress.media_missing, + }, + } + ), + ) if job.status in {"queued"}: job.status = "cancelled" job.finished_at = time.time() + logger.info("chat export queued job cancelled export_id=%s", job.export_id) return True def create_job( @@ -2534,6 +2658,17 @@ class ChatExportManager: with self._lock: self._jobs[export_id] = job + logger.info( + "chat export job created %s", + _safe_json_dumps( + { + "exportId": job.export_id, + "account": account_dir.name, + "options": job.options, + } + ), + ) + t = threading.Thread( target=self._run_job_safe, args=(job, account_dir), @@ -2565,6 +2700,34 @@ class ChatExportManager: job.started_at = time.time() job.error = "" + _trace_id, trace = create_perf_trace( + logger, + "chat_export_job", + exportId=job.export_id, + account=account_dir.name, + ) + _safe_trace(trace, "job_started", thread=threading.current_thread().name) + realtime_pause_reason = f"chat_export:{job.export_id}" + realtime_paused = False + try: + pause_depth = CHAT_REALTIME_AUTOSYNC.pause_account(account_dir.name, reason=realtime_pause_reason) + realtime_paused = bool(pause_depth > 0) + _safe_trace( + trace, + "realtime_autosync_paused", + account=account_dir.name, + reason=realtime_pause_reason, + depth=int(pause_depth), + ) + except Exception: + logger.exception("failed to pause realtime autosync account=%s export_id=%s", account_dir.name, job.export_id) + _safe_trace( + trace, + "realtime_autosync_pause_failed", + account=account_dir.name, + reason=realtime_pause_reason, + ) + opts = dict(job.options or {}) scope: ExportScope = str(opts.get("scope") or "selected") # type: ignore[assignment] export_format_raw = str(opts.get("format") or "json").strip() or "json" @@ -2612,6 +2775,23 @@ class ChatExportManager: local_types = None estimate_local_types = None + _safe_trace( + trace, + "options_resolved", + scope=scope, + format=export_format, + includeMedia=include_media, + mediaKinds=media_kinds, + messageTypes=sorted(want_types) if want_types else None, + startTime=st, + endTime=et, + htmlPageSize=html_page_size, + downloadRemoteMedia=download_remote_media, + privacyMode=privacy_mode, + ) + _raise_if_job_cancelled(job, "options_resolved", trace) + + phase_started = time.perf_counter() target_usernames = _resolve_export_targets( account_dir=account_dir, scope=scope, @@ -2619,11 +2799,20 @@ class ChatExportManager: include_hidden=include_hidden, include_official=include_official, ) + _safe_trace( + trace, + "targets_resolved", + durationMs=_elapsed_ms(phase_started), + conversationCount=len(target_usernames), + scope=scope, + ) if not target_usernames: raise ValueError("No target conversations to export.") + phase_started = time.perf_counter() exports_root = _resolve_export_output_dir(account_dir, opts.get("outputDir")) ts = datetime.now().strftime("%Y%m%d_%H%M%S") + _safe_trace(trace, "output_dir_resolved", durationMs=_elapsed_ms(phase_started), outputDir=str(exports_root)) base_name = str(opts.get("fileName") or "").strip() if not base_name: @@ -2638,12 +2827,14 @@ class ChatExportManager: final_zip = (exports_root / base_name).resolve() tmp_zip = (exports_root / f".{base_name}.{job.export_id}.part").resolve() + _safe_trace(trace, "zip_paths_prepared", finalZip=str(final_zip), tmpZip=str(tmp_zip)) contact_db_path = account_dir / "contact.db" message_resource_db_path = account_dir / "message_resource.db" media_db_path = account_dir / "media_0.db" head_image_db_path = account_dir / "head_image.db" + phase_started = time.perf_counter() resource_conn: Optional[sqlite3.Connection] = None try: if message_resource_db_path.exists(): @@ -2670,6 +2861,16 @@ class ChatExportManager: pass head_image_conn = None + _safe_trace( + trace, + "db_connections_opened", + durationMs=_elapsed_ms(phase_started), + hasResourceDb=resource_conn is not None, + hasHeadImageDb=head_image_conn is not None, + hasMediaDb=media_db_path.exists(), + ) + _raise_if_job_cancelled(job, "db_connections_opened", trace) + contact_cache: dict[str, str] = {} contact_row_cache: dict[str, sqlite3.Row] = {} @@ -2686,10 +2887,40 @@ class ChatExportManager: contact_cache[u] = name return name + phase_started = time.perf_counter() conv_rows = _load_contact_rows(contact_db_path, target_usernames) for k, v in conv_rows.items(): contact_row_cache[k] = v contact_cache[k] = _pick_display_name(v, k) + _safe_trace( + trace, + "contacts_preloaded", + durationMs=_elapsed_ms(phase_started), + requested=len(target_usernames), + loaded=len(conv_rows), + ) + _raise_if_job_cancelled(job, "contacts_preloaded", trace) + + media_index: Optional[MediaPathIndex] = None + if include_media and any(kind in {"image", "emoji", "video", "video_thumb", "file"} for kind in media_kinds): + phase_started = time.perf_counter() + media_index = MediaPathIndex.build( + account_dir=account_dir, + usernames=target_usernames, + media_kinds=media_kinds, + ) + _safe_trace( + trace, + "media_index_built", + durationMs=_elapsed_ms(phase_started), + usernames=len(target_usernames), + mediaKinds=media_kinds, + md5Keys=int(media_index.stats.get("md5Keys") or 0), + fileIdKeys=int(media_index.stats.get("fileIdKeys") or 0), + scannedFiles=int(media_index.stats.get("scannedFiles") or 0), + hardlinkRows=int(media_index.stats.get("hardlinkRows") or 0), + ) + _raise_if_job_cancelled(job, "media_index_built", trace) media_written: dict[str, str] = {} avatar_written: dict[str, str] = {} @@ -2708,6 +2939,7 @@ class ChatExportManager: job.progress.messages_exported = 0 job.progress.media_copied = 0 job.progress.media_missing = 0 + _safe_trace(trace, "progress_initialized", conversationCount=len(target_usernames)) try: if tmp_zip.exists(): @@ -2716,13 +2948,18 @@ class ChatExportManager: except Exception: pass + phase_started = time.perf_counter() + _safe_trace(trace, "zip_open_start", tmpZip=str(tmp_zip)) with zipfile.ZipFile(tmp_zip, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=6) as zf: + _safe_trace(trace, "zip_opened", durationMs=_elapsed_ms(phase_started)) html_index_items: list[dict[str, Any]] = [] self_avatar_path = "" session_items: list[dict[str, Any]] = [] remote_written: dict[str, str] = {} remote_download_enabled = bool(download_remote_media) and (export_format == "html") and include_media and (not privacy_mode) if export_format == "html": + phase_started = time.perf_counter() + _safe_trace(trace, "html_assets_start") ui_public_dir = _resolve_ui_public_dir() css_payload = _load_ui_css_bundle(ui_public_dir=ui_public_dir, report=report) zf.writestr("assets/wechat-chat-export.css", css_payload) @@ -2767,11 +3004,20 @@ class ChatExportManager: dest_prefix="assets/images/wechat", written=static_written, ) + _safe_trace( + trace, + "html_assets_done", + durationMs=_elapsed_ms(phase_started), + uiPublicDir=str(ui_public_dir) if ui_public_dir is not None else "", + staticFiles=len(static_written), + ) + _raise_if_job_cancelled(job, "html_assets_done", trace) preview_by_username: dict[str, str] = {} last_ts_by_username: dict[str, int] = {} if not privacy_mode: + phase_started = time.perf_counter() self_avatar_path = _materialize_avatar( zf=zf, head_image_conn=head_image_conn, @@ -2821,8 +3067,19 @@ class ChatExportManager: last_ts_by_username = {} finally: sconn.close() + _safe_trace( + trace, + "html_session_metadata_loaded", + durationMs=_elapsed_ms(phase_started), + previews=len(preview_by_username), + lastTimestamps=len(last_ts_by_username), + hasSelfAvatar=bool(self_avatar_path), + ) + _raise_if_job_cancelled(job, "html_session_metadata_loaded", trace) + phase_started = time.perf_counter() for idx, conv_username in enumerate(target_usernames, start=1): + _raise_if_job_cancelled(job, "html_session_index", trace, index=idx) conv_row = contact_row_cache.get(conv_username) conv_name = _pick_display_name(conv_row, conv_username) conv_is_group = bool(conv_username.endswith("@chatroom")) @@ -2848,11 +3105,17 @@ class ChatExportManager: "previewText": ("" if privacy_mode else str(preview_by_username.get(conv_username) or "")), } ) + _safe_trace( + trace, + "html_session_index_built", + durationMs=_elapsed_ms(phase_started), + sessionItems=len(session_items), + ) for idx, conv_username in enumerate(target_usernames, start=1): - if self._should_cancel(job): - raise _JobCancelled() + _raise_if_job_cancelled(job, "conversation_loop_start", trace, index=idx) + conv_started = time.perf_counter() conv_row = contact_row_cache.get(conv_username) conv_name = _pick_display_name(conv_row, conv_username) conv_is_group = bool(conv_username.endswith("@chatroom")) @@ -2867,6 +3130,7 @@ class ChatExportManager: job.progress.current_conversation_messages_total = 0 try: + phase_started = time.perf_counter() estimated_total = _estimate_conversation_message_count( account_dir=account_dir, conv_username=conv_username, @@ -2876,26 +3140,57 @@ class ChatExportManager: ) except Exception: estimated_total = 0 + _safe_trace( + trace, + "conversation_estimated", + index=idx, + conversation=conv_username, + displayName=conv_name, + durationMs=_elapsed_ms(phase_started), + estimatedTotal=estimated_total, + ) + _raise_if_job_cancelled(job, "conversation_estimated", trace, index=idx, conversation=conv_username) with self._lock: job.progress.current_conversation_messages_total = int(estimated_total) chat_id = None try: + phase_started = time.perf_counter() if resource_conn is not None: chat_id = _resource_lookup_chat_id(resource_conn, conv_username) except Exception: chat_id = None + _safe_trace( + trace, + "conversation_resource_lookup", + index=idx, + conversation=conv_username, + durationMs=_elapsed_ms(phase_started), + chatId=chat_id, + ) + _raise_if_job_cancelled(job, "conversation_resource_lookup", trace, index=idx, conversation=conv_username) conv_avatar_path = "" if not privacy_mode: + phase_started = time.perf_counter() conv_avatar_path = _materialize_avatar( zf=zf, head_image_conn=head_image_conn, username=conv_username, avatar_written=avatar_written, ) + _safe_trace( + trace, + "conversation_avatar_materialized", + index=idx, + conversation=conv_username, + durationMs=_elapsed_ms(phase_started), + hasAvatar=bool(conv_avatar_path), + ) + _raise_if_job_cancelled(job, "conversation_avatar_materialized", trace, index=idx, conversation=conv_username) + phase_started = time.perf_counter() if export_format == "txt": exported_count = _write_conversation_txt( zf=zf, @@ -2921,6 +3216,7 @@ class ChatExportManager: report=report, allow_process_key_extract=allow_process_key_extract, media_db_path=media_db_path, + media_index=media_index, job=job, lock=self._lock, ) @@ -2954,6 +3250,7 @@ class ChatExportManager: report=report, allow_process_key_extract=allow_process_key_extract, media_db_path=media_db_path, + media_index=media_index, job=job, lock=self._lock, ) @@ -2982,10 +3279,26 @@ class ChatExportManager: report=report, allow_process_key_extract=allow_process_key_extract, media_db_path=media_db_path, + media_index=media_index, job=job, lock=self._lock, ) + _safe_trace( + trace, + "conversation_writer_done", + index=idx, + conversation=conv_username, + displayName=conv_name, + format=export_format, + durationMs=_elapsed_ms(phase_started), + exportedCount=exported_count, + mediaCopied=job.progress.media_copied, + mediaMissing=job.progress.media_missing, + ) + _raise_if_job_cancelled(job, "conversation_writer_done", trace, index=idx, conversation=conv_username) + + phase_started = time.perf_counter() meta = { "schemaVersion": 1, "username": "" if privacy_mode else conv_username, @@ -3003,8 +3316,19 @@ class ChatExportManager: job.progress.current_conversation_messages_exported = int(exported_count) job.progress.current_conversation_messages_total = int(exported_count) job.progress.conversations_done += 1 + _safe_trace( + trace, + "conversation_done", + index=idx, + conversation=conv_username, + durationMs=_elapsed_ms(conv_started), + metaWriteMs=_elapsed_ms(phase_started), + conversationsDone=job.progress.conversations_done, + exportedCount=exported_count, + ) if export_format == "html": + phase_started = time.perf_counter() def esc_text(v: Any) -> str: return html.escape(str(v or ""), quote=False) @@ -3069,7 +3393,15 @@ class ChatExportManager: parts.append("\n") parts.append("\n") zf.writestr("index.html", "".join(parts)) + _safe_trace( + trace, + "html_index_written", + durationMs=_elapsed_ms(phase_started), + conversations=len(html_index_items), + ) + _raise_if_job_cancelled(job, "html_index_written", trace) + phase_started = time.perf_counter() manifest = { "schemaVersion": 1, "exportedAt": _now_iso(), @@ -3102,15 +3434,39 @@ class ChatExportManager: } zf.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2)) zf.writestr("report.json", json.dumps(report, ensure_ascii=False, indent=2)) + _safe_trace( + trace, + "manifest_written", + durationMs=_elapsed_ms(phase_started), + messagesExported=job.progress.messages_exported, + mediaCopied=job.progress.media_copied, + mediaMissing=job.progress.media_missing, + errors=len(report.get("errors") or []), + missingMedia=len(report.get("missingMedia") or []), + ) + _safe_trace(trace, "zip_closed", tmpZip=str(tmp_zip)) + _raise_if_job_cancelled(job, "before_finalize", trace) + + phase_started = time.perf_counter() if final_zip.exists(): final_zip = (exports_root / f"{final_zip.stem}_{job.export_id}{final_zip.suffix}").resolve() tmp_zip.replace(final_zip) + _safe_trace(trace, "zip_finalized", durationMs=_elapsed_ms(phase_started), finalZip=str(final_zip)) with self._lock: job.status = "done" job.zip_path = final_zip job.finished_at = time.time() + _safe_trace( + trace, + "job_done", + durationMs=round(((job.finished_at or time.time()) - (job.started_at or job.created_at)) * 1000.0, 1), + finalZip=str(final_zip), + messagesExported=job.progress.messages_exported, + mediaCopied=job.progress.media_copied, + mediaMissing=job.progress.media_missing, + ) except _JobCancelled: try: if tmp_zip.exists(): @@ -3120,7 +3476,33 @@ class ChatExportManager: with self._lock: job.status = "cancelled" job.finished_at = time.time() + _safe_trace( + trace, + "job_cancelled", + durationMs=round(((job.finished_at or time.time()) - (job.started_at or job.created_at)) * 1000.0, 1), + messagesExported=job.progress.messages_exported, + mediaCopied=job.progress.media_copied, + mediaMissing=job.progress.media_missing, + ) finally: + if realtime_paused: + try: + resume_depth = CHAT_REALTIME_AUTOSYNC.resume_account(account_dir.name, reason=realtime_pause_reason) + _safe_trace( + trace, + "realtime_autosync_resumed", + account=account_dir.name, + reason=realtime_pause_reason, + depth=int(resume_depth), + ) + except Exception: + logger.exception("failed to resume realtime autosync account=%s export_id=%s", account_dir.name, job.export_id) + _safe_trace( + trace, + "realtime_autosync_resume_failed", + account=account_dir.name, + reason=realtime_pause_reason, + ) try: if resource_conn is not None: resource_conn.close() @@ -3836,14 +4218,33 @@ def _write_conversation_json( report: dict[str, Any], allow_process_key_extract: bool, media_db_path: Path, + media_index: Optional[MediaPathIndex], job: ExportJob, lock: threading.Lock, ) -> int: arcname = f"{conv_dir}/messages.json" exported = 0 + _trace_id, trace = create_perf_trace( + logger, + "chat_export_conversation_writer", + exportId=job.export_id, + format="json", + conversation=conv_username, + ) + _safe_trace( + trace, + "writer_started", + convDir=conv_dir, + displayName=conv_name, + includeMedia=include_media, + mediaKinds=media_kinds, + privacyMode=privacy_mode, + messageTypes=sorted(want_types) if want_types else None, + ) contact_conn: Optional[sqlite3.Connection] = None alias_cache: dict[str, str] = {} + phase_started = time.perf_counter() if conv_is_group: try: contact_db_path = account_dir / "contact.db" @@ -3851,6 +4252,13 @@ def _write_conversation_json( contact_conn = sqlite3.connect(str(contact_db_path)) except Exception: contact_conn = None + _safe_trace( + trace, + "alias_db_ready", + durationMs=_elapsed_ms(phase_started), + isGroup=conv_is_group, + hasAliasDb=contact_conn is not None, + ) def lookup_alias(username: str) -> str: u = str(username or "").strip() @@ -3921,6 +4329,22 @@ def _write_conversation_json( local_types=local_types, ): scanned += 1 + _raise_if_job_cancelled( + job, + "json.scan", + trace, + conversation=conv_username, + scanned=scanned, + exported=exported, + ) + _log_writer_progress( + trace, + export_format="json", + job=job, + conv_username=conv_username, + scanned=scanned, + exported=exported, + ) sender_alias = "" if conv_is_group and row.raw_text and (not row.raw_text.startswith("<")) and (not row.raw_text.startswith('"<')): @@ -3936,6 +4360,7 @@ def _write_conversation_json( if not body_is_xml: sender_alias = lookup_alias(su) + phase_started = time.perf_counter() msg = _parse_message_for_export( row=row, conv_username=conv_username, @@ -3945,6 +4370,15 @@ def _write_conversation_json( sender_alias=sender_alias, resolve_display_name=resolve_display_name, ) + _log_export_slow_step( + "json.parse_message", + phase_started, + exportId=job.export_id, + conversation=conv_username, + scanned=scanned, + localType=row.local_type, + serverId=row.server_id, + ) if not _is_render_type_selected(msg.get("renderType"), want_types): continue @@ -3953,6 +4387,7 @@ def _write_conversation_json( _privacy_scrub_message(msg, conv_is_group=conv_is_group, sender_alias_map=sender_alias_map) else: msg["senderDisplayName"] = resolve_display_name(su) if su else "" + phase_started = time.perf_counter() msg["senderAvatarPath"] = ( _materialize_avatar( zf=zf, @@ -3963,8 +4398,17 @@ def _write_conversation_json( if (su and head_image_conn is not None) else "" ) + _log_export_slow_step( + "json.sender_avatar", + phase_started, + exportId=job.export_id, + conversation=conv_username, + scanned=scanned, + sender=su, + ) if include_media: + phase_started = time.perf_counter() _attach_offline_media( zf=zf, account_dir=account_dir, @@ -3975,9 +4419,20 @@ def _write_conversation_json( media_kinds=media_kinds, allow_process_key_extract=allow_process_key_extract, media_db_path=media_db_path, + media_index=media_index, lock=lock, job=job, ) + _log_export_slow_step( + "json.attach_media", + phase_started, + exportId=job.export_id, + conversation=conv_username, + scanned=scanned, + renderType=msg.get("renderType"), + localId=msg.get("localId"), + serverId=msg.get("serverId"), + ) if not first: tw.write(",\n") @@ -3989,20 +4444,30 @@ def _write_conversation_json( job.progress.messages_exported += 1 job.progress.current_conversation_messages_exported = exported - if scanned % 500 == 0 and job.cancel_requested: - raise _JobCancelled() - tw.write("\n ]\n") tw.write("}\n") tw.flush() + _log_writer_progress( + trace, + export_format="json", + job=job, + conv_username=conv_username, + scanned=scanned, + exported=exported, + force=True, + ) + _safe_trace(trace, "messages_temp_written", scanned=scanned, exported=exported) + phase_started = time.perf_counter() zf.write(str(tmp_path), arcname) + _safe_trace(trace, "zip_entry_written", durationMs=_elapsed_ms(phase_started), arcname=arcname) if contact_conn is not None: try: contact_conn.close() except Exception: pass + _safe_trace(trace, "writer_done", exported=exported) return exported @@ -4031,14 +4496,33 @@ def _write_conversation_txt( report: dict[str, Any], allow_process_key_extract: bool, media_db_path: Path, + media_index: Optional[MediaPathIndex], job: ExportJob, lock: threading.Lock, ) -> int: arcname = f"{conv_dir}/messages.txt" exported = 0 + _trace_id, trace = create_perf_trace( + logger, + "chat_export_conversation_writer", + exportId=job.export_id, + format="txt", + conversation=conv_username, + ) + _safe_trace( + trace, + "writer_started", + convDir=conv_dir, + displayName=conv_name, + includeMedia=include_media, + mediaKinds=media_kinds, + privacyMode=privacy_mode, + messageTypes=sorted(want_types) if want_types else None, + ) contact_conn: Optional[sqlite3.Connection] = None alias_cache: dict[str, str] = {} + phase_started = time.perf_counter() if conv_is_group: try: contact_db_path = account_dir / "contact.db" @@ -4046,6 +4530,13 @@ def _write_conversation_txt( contact_conn = sqlite3.connect(str(contact_db_path)) except Exception: contact_conn = None + _safe_trace( + trace, + "alias_db_ready", + durationMs=_elapsed_ms(phase_started), + isGroup=conv_is_group, + hasAliasDb=contact_conn is not None, + ) def lookup_alias(username: str) -> str: u = str(username or "").strip() @@ -4101,6 +4592,22 @@ def _write_conversation_txt( local_types=local_types, ): scanned += 1 + _raise_if_job_cancelled( + job, + "txt.scan", + trace, + conversation=conv_username, + scanned=scanned, + exported=exported, + ) + _log_writer_progress( + trace, + export_format="txt", + job=job, + conv_username=conv_username, + scanned=scanned, + exported=exported, + ) sender_alias = "" if conv_is_group and row.raw_text and (not row.raw_text.startswith("<")) and (not row.raw_text.startswith('"<')): sep = row.raw_text.find(":\n") @@ -4115,6 +4622,7 @@ def _write_conversation_txt( if not body_is_xml: sender_alias = lookup_alias(su) + phase_started = time.perf_counter() msg = _parse_message_for_export( row=row, conv_username=conv_username, @@ -4124,6 +4632,15 @@ def _write_conversation_txt( sender_alias=sender_alias, resolve_display_name=resolve_display_name, ) + _log_export_slow_step( + "txt.parse_message", + phase_started, + exportId=job.export_id, + conversation=conv_username, + scanned=scanned, + localType=row.local_type, + serverId=row.server_id, + ) if not _is_render_type_selected(msg.get("renderType"), want_types): continue @@ -4132,6 +4649,7 @@ def _write_conversation_txt( _privacy_scrub_message(msg, conv_is_group=conv_is_group, sender_alias_map=sender_alias_map) else: msg["senderDisplayName"] = resolve_display_name(su) if su else "" + phase_started = time.perf_counter() msg["senderAvatarPath"] = ( _materialize_avatar( zf=zf, @@ -4142,8 +4660,17 @@ def _write_conversation_txt( if (su and head_image_conn is not None) else "" ) + _log_export_slow_step( + "txt.sender_avatar", + phase_started, + exportId=job.export_id, + conversation=conv_username, + scanned=scanned, + sender=su, + ) if include_media: + phase_started = time.perf_counter() _attach_offline_media( zf=zf, account_dir=account_dir, @@ -4154,9 +4681,20 @@ def _write_conversation_txt( media_kinds=media_kinds, allow_process_key_extract=allow_process_key_extract, media_db_path=media_db_path, + media_index=media_index, lock=lock, job=job, ) + _log_export_slow_step( + "txt.attach_media", + phase_started, + exportId=job.export_id, + conversation=conv_username, + scanned=scanned, + renderType=msg.get("renderType"), + localId=msg.get("localId"), + serverId=msg.get("serverId"), + ) tw.write(_format_message_line_txt(msg=msg) + "\n") @@ -4165,18 +4703,28 @@ def _write_conversation_txt( job.progress.messages_exported += 1 job.progress.current_conversation_messages_exported = exported - if scanned % 500 == 0 and job.cancel_requested: - raise _JobCancelled() - tw.flush() + _log_writer_progress( + trace, + export_format="txt", + job=job, + conv_username=conv_username, + scanned=scanned, + exported=exported, + force=True, + ) + _safe_trace(trace, "messages_temp_written", scanned=scanned, exported=exported) + phase_started = time.perf_counter() zf.write(str(tmp_path), arcname) + _safe_trace(trace, "zip_entry_written", durationMs=_elapsed_ms(phase_started), arcname=arcname) if contact_conn is not None: try: contact_conn.close() except Exception: pass + _safe_trace(trace, "writer_done", exported=exported) return exported @@ -4210,11 +4758,32 @@ def _write_conversation_html( report: dict[str, Any], allow_process_key_extract: bool, media_db_path: Path, + media_index: Optional[MediaPathIndex], job: ExportJob, lock: threading.Lock, ) -> int: arcname = f"{conv_dir}/messages.html" exported = 0 + _trace_id, trace = create_perf_trace( + logger, + "chat_export_conversation_writer", + exportId=job.export_id, + format="html", + conversation=conv_username, + ) + _safe_trace( + trace, + "writer_started", + convDir=conv_dir, + displayName=conv_name, + includeMedia=include_media, + mediaKinds=media_kinds, + privacyMode=privacy_mode, + messageTypes=sorted(want_types) if want_types else None, + downloadRemoteMedia=download_remote_media, + htmlPageSize=html_page_size, + sessionItems=len(session_items), + ) rel_root = "../../" css_href = rel_root + "assets/wechat-chat-export.css" @@ -4551,6 +5120,7 @@ def _write_conversation_html( file_id="", media_written=media_written, suggested_name="", + media_index=media_index, ) if arc: break @@ -4873,7 +5443,24 @@ def _write_conversation_html( local_types=local_types, ): scanned += 1 + _raise_if_job_cancelled( + job, + "html.scan", + trace, + conversation=conv_username, + scanned=scanned, + exported=exported, + ) + _log_writer_progress( + trace, + export_format="html", + job=job, + conv_username=conv_username, + scanned=scanned, + exported=exported, + ) + phase_started = time.perf_counter() msg = _parse_message_for_export( row=row, conv_username=conv_username, @@ -4883,6 +5470,15 @@ def _write_conversation_html( sender_alias="", resolve_display_name=resolve_display_name, ) + _log_export_slow_step( + "html.parse_message", + phase_started, + exportId=job.export_id, + conversation=conv_username, + scanned=scanned, + localType=row.local_type, + serverId=row.server_id, + ) if not _is_render_type_selected(msg.get("renderType"), want_types): continue @@ -4891,6 +5487,7 @@ def _write_conversation_html( _privacy_scrub_message(msg, conv_is_group=conv_is_group, sender_alias_map=sender_alias_map) else: msg["senderDisplayName"] = resolve_display_name(sender_username) if sender_username else "" + phase_started = time.perf_counter() msg["senderAvatarPath"] = ( _materialize_avatar( zf=zf, @@ -4901,8 +5498,17 @@ def _write_conversation_html( if (sender_username and head_image_conn is not None) else "" ) + _log_export_slow_step( + "html.sender_avatar", + phase_started, + exportId=job.export_id, + conversation=conv_username, + scanned=scanned, + sender=sender_username, + ) if include_media: + phase_started = time.perf_counter() _attach_offline_media( zf=zf, account_dir=account_dir, @@ -4913,10 +5519,21 @@ def _write_conversation_html( media_kinds=media_kinds, allow_process_key_extract=allow_process_key_extract, media_db_path=media_db_path, + media_index=media_index, lock=lock, job=job, ) _remember_offline_media(msg) + _log_export_slow_step( + "html.attach_media", + phase_started, + exportId=job.export_id, + conversation=conv_username, + scanned=scanned, + renderType=msg.get("renderType"), + localId=msg.get("localId"), + serverId=msg.get("serverId"), + ) rt = str(msg.get("renderType") or "text").strip() or "text" create_time_text = str(msg.get("createTimeText") or "").strip() @@ -5262,6 +5879,7 @@ def _write_conversation_html( file_id="", media_written=media_written, suggested_name="", + media_index=media_index, ) except Exception: arc, is_new = "", False @@ -5506,9 +6124,6 @@ def _write_conversation_html( if ts: prev_ts = ts - if scanned % 500 == 0 and job.cancel_requested: - raise _JobCancelled() - if page_size > 0: _close_page_fp() paged_total_pages = max(1, len(page_frag_paths)) @@ -5567,11 +6182,38 @@ def _write_conversation_html( tw.write("\n") tw.write("\n") tw.flush() + _log_writer_progress( + trace, + export_format="html", + job=job, + conv_username=conv_username, + scanned=scanned, + exported=exported, + force=True, + ) + _safe_trace( + trace, + "messages_temp_written", + scanned=scanned, + exported=exported, + pagedFragments=len(page_frag_paths), + ) + phase_started = time.perf_counter() zf.write(str(tmp_path), arcname) + _safe_trace(trace, "zip_entry_written", durationMs=_elapsed_ms(phase_started), arcname=arcname) if page_size > 0 and paged_old_page_paths: + phase_started = time.perf_counter() for page_no, frag_path in enumerate(paged_old_page_paths, start=1): + _raise_if_job_cancelled( + job, + "html.page_fragment_write", + trace, + conversation=conv_username, + page=page_no, + totalPages=len(paged_old_page_paths), + ) try: frag_text = frag_path.read_text(encoding="utf-8") except Exception: @@ -5602,7 +6244,14 @@ def _write_conversation_html( "})();\n" ) zf.writestr(arc_js, js_payload) + _safe_trace( + trace, + "page_fragments_written", + durationMs=_elapsed_ms(phase_started), + fragments=len(paged_old_page_paths), + ) + _safe_trace(trace, "writer_done", exported=exported) return exported @@ -5759,6 +6408,7 @@ def _attach_offline_media( media_kinds: list[MediaKind], allow_process_key_extract: bool, media_db_path: Path, + media_index: Optional[MediaPathIndex], lock: threading.Lock, job: ExportJob, ) -> None: @@ -5766,6 +6416,14 @@ def _attach_offline_media( _ = allow_process_key_extract rt = str(msg.get("renderType") or "") + _raise_if_job_cancelled( + job, + "attach_offline_media.start", + conversation=conv_username, + renderType=rt, + messageId=msg.get("id"), + serverId=msg.get("serverId"), + ) def record_missing(kind: str, ident: str) -> None: with lock: @@ -5830,6 +6488,7 @@ def _attach_offline_media( file_id="", media_written=media_written, suggested_name="", + media_index=media_index, ) if arc: used_md5 = md5 @@ -5846,6 +6505,7 @@ def _attach_offline_media( file_id=file_id, media_written=media_written, suggested_name="", + media_index=media_index, ) if arc: used_file_id = file_id @@ -5879,6 +6539,7 @@ def _attach_offline_media( file_id="", media_written=media_written, suggested_name="", + media_index=media_index, ) if arc: offline.append({"kind": "emoji", "path": arc, "md5": md5}) @@ -5901,6 +6562,7 @@ def _attach_offline_media( file_id=file_id, media_written=media_written, suggested_name="", + media_index=media_index, ) if arc: offline.append({"kind": "video_thumb", "path": arc, "md5": md5, "fileId": file_id}) @@ -5922,6 +6584,7 @@ def _attach_offline_media( file_id=file_id, media_written=media_written, suggested_name="", + media_index=media_index, ) if arc: offline.append({"kind": "video", "path": arc, "md5": md5, "fileId": file_id}) @@ -5959,6 +6622,7 @@ def _attach_offline_media( file_id="", media_written=media_written, suggested_name=str(msg.get("title") or "").strip(), + media_index=media_index, ) if arc: offline.append({"kind": "file", "path": arc, "md5": md5, "title": str(msg.get("title") or "").strip()}) @@ -5979,6 +6643,7 @@ def _materialize_avatar( username: str, avatar_written: dict[str, str], ) -> str: + started_at = time.perf_counter() u = str(username or "").strip() if not u or head_image_conn is None: return "" @@ -6030,6 +6695,13 @@ def _materialize_avatar( return "" avatar_written[key] = arc + _log_export_slow_step( + "materialize_avatar", + started_at, + username=u, + arc=arc, + bytes=len(data), + ) return arc @@ -6040,6 +6712,7 @@ def _materialize_voice( server_id: int, media_written: dict[str, str], ) -> tuple[str, bool]: + started_at = time.perf_counter() key = f"voice:{int(server_id)}" existing = media_written.get(key) if existing: @@ -6068,11 +6741,24 @@ def _materialize_voice( payload, ext, _media_type = _convert_silk_to_browser_audio(data, preferred_format="mp3") if not payload: + _log_export_slow_step( + "materialize_voice_failed", + started_at, + serverId=server_id, + reason="convert_failed", + ) return "", False arc = f"media/voices/voice_{int(server_id)}.{ext}" zf.writestr(arc, payload) media_written[key] = arc + _log_export_slow_step( + "materialize_voice", + started_at, + serverId=server_id, + arc=arc, + bytes=len(payload), + ) return arc, True @@ -6086,30 +6772,102 @@ def _materialize_media( file_id: str, media_written: dict[str, str], suggested_name: str, + media_index: Optional[MediaPathIndex], ) -> tuple[str, bool]: + started_at = time.perf_counter() ident = md5 or file_id if not ident: return "", False key = f"{kind}:{ident}" - existing = media_written.get(key) - if existing: - return existing, False + if key in media_written: + return media_written.get(key) or "", False src: Optional[Path] = None + resolved_via_index = False + backfill_index = False + known_missing = False + if media_index is not None: + try: + known_missing = media_index.is_known_missing( + kind=str(kind), + md5=str(md5 or "").strip().lower(), + file_id=str(file_id or "").strip(), + username=str(conv_username or "").strip(), + ) + except Exception: + known_missing = False + allow_fallback_scan = kind != "emoji" + if media_index is not None and kind in {"image", "video", "video_thumb", "file"}: + allow_fallback_scan = False + if known_missing: + allow_fallback_scan = False + allow_file_id_fallback = bool(file_id) and not known_missing + if media_index is not None and kind in {"image", "video", "video_thumb", "file"}: + allow_file_id_fallback = False if md5 and _is_md5(md5): + cache_lookup_started = time.perf_counter() try: src = _try_find_decrypted_resource(account_dir, md5) except Exception: src = None + _log_export_slow_step( + "materialize_media_cache_lookup", + cache_lookup_started, + kind=kind, + ident=ident, + conversation=conv_username, + hit=bool(src), + ) - if src is None: - try: - src = _resolve_media_path_for_kind(account_dir, kind=kind, md5=md5, username=conv_username) - except Exception: - src = None + if src is None and media_index is not None: + index_lookup_started = time.perf_counter() + try: + src = media_index.resolve( + kind=str(kind), + md5=str(md5 or "").strip().lower(), + file_id=str(file_id or "").strip(), + username=str(conv_username or "").strip(), + ) + resolved_via_index = bool(src) + except Exception: + src = None + _log_export_slow_step( + "materialize_media_index_lookup", + index_lookup_started, + kind=kind, + ident=ident, + conversation=conv_username, + hit=bool(src), + hasMd5=bool(md5 and _is_md5(md5)), + hasFileId=bool(file_id), + knownMissing=bool(known_missing), + ) - if src is None and file_id: + if src is None and md5 and _is_md5(md5): + resolve_started = time.perf_counter() + try: + src = _resolve_media_path_for_kind( + account_dir, + kind=kind, + md5=md5, + username=conv_username, + allow_fallback_scan=False, + ) + except Exception: + src = None + _log_export_slow_step( + "materialize_media_resolve_md5", + resolve_started, + kind=kind, + ident=ident, + conversation=conv_username, + hit=bool(src), + fallbackScan=False, + ) + + if src is None and file_id and media_index is None: + file_id_lookup_started = time.perf_counter() try: wxid_dir = _resolve_account_wxid_dir(account_dir) db_storage_dir = _resolve_account_db_storage_dir(account_dir) @@ -6127,8 +6885,101 @@ def _materialize_media( break except Exception: src = None + _log_export_slow_step( + "materialize_media_resolve_file_id", + file_id_lookup_started, + kind=kind, + ident=ident, + conversation=conv_username, + hit=bool(src), + ) + + if src is None and md5 and _is_md5(md5) and allow_fallback_scan: + fallback_md5_started = time.perf_counter() + try: + src = _resolve_media_path_for_kind( + account_dir, + kind=kind, + md5=md5, + username=conv_username, + allow_fallback_scan=True, + ) + except Exception: + src = None + backfill_index = bool(src) + _log_export_slow_step( + "materialize_media_resolve_md5_fallback", + fallback_md5_started, + kind=kind, + ident=ident, + conversation=conv_username, + hit=bool(src), + fallbackScan=True, + ) + + if src is None and allow_file_id_fallback: + file_id_lookup_started = time.perf_counter() + try: + wxid_dir = _resolve_account_wxid_dir(account_dir) + db_storage_dir = _resolve_account_db_storage_dir(account_dir) + for r in [wxid_dir, db_storage_dir]: + if not r: + continue + hit = _fallback_search_media_by_file_id( + str(r), + str(file_id), + kind=str(kind), + username=str(conv_username or ""), + ) + if hit: + src = Path(hit) + break + except Exception: + src = None + backfill_index = bool(src) + _log_export_slow_step( + "materialize_media_resolve_file_id", + file_id_lookup_started, + kind=kind, + ident=ident, + conversation=conv_username, + hit=bool(src), + fallbackScan=True, + ) + + if src is not None and media_index is not None and backfill_index and not resolved_via_index: + try: + media_index.remember_path( + kind=str(kind), + path=src, + username=str(conv_username or "").strip(), + ) + except Exception: + pass if not src: + if media_index is not None: + try: + media_index.mark_missing( + kind=str(kind), + md5=str(md5 or "").strip().lower(), + file_id=str(file_id or "").strip(), + username=str(conv_username or "").strip(), + ) + except Exception: + pass + media_written[key] = "" + _log_export_slow_step( + "materialize_media_miss", + started_at, + kind=kind, + ident=ident, + conversation=conv_username, + fallbackScan=bool(allow_fallback_scan), + fileIdFallback=bool(allow_file_id_fallback), + knownMissing=bool(known_missing), + lookupMode=("md5" if md5 else "file_id"), + ) return "", False try: @@ -6237,6 +7088,21 @@ def _materialize_media( return "", False media_written[key] = arc + try: + src_size = int(src.stat().st_size) + except Exception: + src_size = 0 + _log_export_slow_step( + "materialize_media", + started_at, + kind=kind, + ident=ident, + conversation=conv_username, + src=str(src), + arc=arc, + bytes=src_size, + streamed=bool(should_stream_copy or (kind not in {"image", "emoji", "video", "video_thumb"})), + ) return arc, True diff --git a/tests/test_chat_export_cancel_responsiveness.py b/tests/test_chat_export_cancel_responsiveness.py new file mode 100644 index 0000000..4c81207 --- /dev/null +++ b/tests/test_chat_export_cancel_responsiveness.py @@ -0,0 +1,163 @@ +import importlib +import sys +import threading +import unittest +import zipfile +from pathlib import Path +from tempfile import TemporaryDirectory + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class TestChatExportCancelResponsiveness(unittest.TestCase): + def _reload_export_module(self): + import wechat_decrypt_tool.chat_export_service as chat_export_service + + importlib.reload(chat_export_service) + return chat_export_service + + def test_json_writer_checks_cancel_between_messages(self): + svc = self._reload_export_module() + job = svc.ExportJob(export_id="exp_cancel", account="wxid_test", status="running") + rows = [ + svc._Row( + db_stem="message_0", + table_name="msg_demo", + local_id=1, + server_id=1001, + local_type=1, + sort_seq=1, + create_time=1735689601, + raw_text="第一条", + sender_username="wxid_friend", + is_sent=False, + ), + svc._Row( + db_stem="message_0", + table_name="msg_demo", + local_id=2, + server_id=1002, + local_type=1, + sort_seq=2, + create_time=1735689602, + raw_text="第二条", + sender_username="wxid_friend", + is_sent=False, + ), + ] + + original_iter = svc._iter_rows_for_conversation + original_parse = svc._parse_message_for_export + try: + def fake_iter_rows_for_conversation(**_kwargs): + yield rows[0] + job.cancel_requested = True + yield rows[1] + + def fake_parse_message_for_export(**kwargs): + row = kwargs["row"] + return { + "id": f"{row.db_stem}:{row.table_name}:{row.local_id}", + "localId": row.local_id, + "serverId": row.server_id, + "createTime": row.create_time, + "createTimeText": "2025-01-01 08:00:00", + "sortSeq": row.sort_seq, + "type": row.local_type, + "renderType": "text", + "isSent": bool(row.is_sent), + "senderUsername": row.sender_username, + "conversationUsername": kwargs["conv_username"], + "isGroup": False, + "content": row.raw_text, + "title": "", + "url": "", + "from": "", + "fromUsername": "", + "linkType": "", + "linkStyle": "", + "objectId": "", + "objectNonceId": "", + "recordItem": "", + "thumbUrl": "", + "imageMd5": "", + "imageFileId": "", + "imageMd5Candidates": [], + "imageFileIdCandidates": [], + "imageUrl": "", + "emojiMd5": "", + "emojiUrl": "", + "videoMd5": "", + "videoThumbMd5": "", + "videoFileId": "", + "videoThumbFileId": "", + "videoUrl": "", + "videoThumbUrl": "", + "voiceLength": "", + "quoteUsername": "", + "quoteServerId": "", + "quoteType": "", + "quoteThumbUrl": "", + "quoteVoiceLength": "", + "quoteTitle": "", + "quoteContent": "", + "amount": "", + "coverUrl": "", + "fileSize": "", + "fileMd5": "", + "paySubType": "", + "transferStatus": "", + "transferId": "", + "voipType": "", + "locationLat": None, + "locationLng": None, + "locationPoiname": "", + "locationLabel": "", + } + + svc._iter_rows_for_conversation = fake_iter_rows_for_conversation + svc._parse_message_for_export = fake_parse_message_for_export + + with TemporaryDirectory() as td: + zip_path = Path(td) / "out.zip" + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: + with self.assertRaises(svc._JobCancelled): + svc._write_conversation_json( + zf=zf, + conv_dir="conversations/demo", + account_dir=Path(td), + conv_username="wxid_friend", + conv_name="测试好友", + conv_avatar_path="", + conv_is_group=False, + start_time=None, + end_time=None, + want_types=None, + local_types=None, + resource_conn=None, + resource_chat_id=None, + head_image_conn=None, + resolve_display_name=lambda username: username, + privacy_mode=False, + include_media=False, + media_kinds=[], + media_written={}, + avatar_written={}, + report={"errors": [], "missingMedia": []}, + allow_process_key_extract=False, + media_db_path=Path(td) / "media_0.db", + media_index=None, + job=job, + lock=threading.Lock(), + ) + self.assertEqual(job.progress.messages_exported, 1) + self.assertTrue(job.cancel_requested) + finally: + svc._iter_rows_for_conversation = original_iter + svc._parse_message_for_export = original_parse + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_chat_export_emoji_fast_miss.py b/tests/test_chat_export_emoji_fast_miss.py new file mode 100644 index 0000000..e9df504 --- /dev/null +++ b/tests/test_chat_export_emoji_fast_miss.py @@ -0,0 +1,130 @@ +import importlib +import io +import unittest +import zipfile +from pathlib import Path +from tempfile import TemporaryDirectory + + +class TestChatExportEmojiFastMiss(unittest.TestCase): + def _reload_export_module(self): + import wechat_decrypt_tool.chat_export_service as chat_export_service + + importlib.reload(chat_export_service) + return chat_export_service + + def test_emoji_miss_skips_fallback_scan_and_caches_negative_result(self): + svc = self._reload_export_module() + md5 = "8f436b616da7832e5c206ba3d781a714" + calls: list[dict[str, object]] = [] + + original_try_find = svc._try_find_decrypted_resource + original_resolve_kind = svc._resolve_media_path_for_kind + try: + svc._try_find_decrypted_resource = lambda *args, **kwargs: None + + def fake_resolve_kind(account_dir, kind, md5, username, allow_fallback_scan=True): + calls.append( + { + "account_dir": account_dir, + "kind": kind, + "md5": md5, + "username": username, + "allow_fallback_scan": allow_fallback_scan, + } + ) + return None + + svc._resolve_media_path_for_kind = fake_resolve_kind + + with TemporaryDirectory() as td: + media_written: dict[str, str] = {} + with io.BytesIO() as buf, zipfile.ZipFile(buf, "w") as zf: + arc1, is_new1 = svc._materialize_media( + zf=zf, + account_dir=Path(td), + conv_username="room@chatroom", + kind="emoji", + md5=md5, + file_id="", + media_written=media_written, + suggested_name="", + media_index=None, + ) + arc2, is_new2 = svc._materialize_media( + zf=zf, + account_dir=Path(td), + conv_username="room@chatroom", + kind="emoji", + md5=md5, + file_id="", + media_written=media_written, + suggested_name="", + media_index=None, + ) + + self.assertEqual(arc1, "") + self.assertEqual(arc2, "") + self.assertFalse(is_new1) + self.assertFalse(is_new2) + self.assertEqual(len(calls), 1) + self.assertEqual(calls[0]["kind"], "emoji") + self.assertEqual(calls[0]["md5"], md5) + self.assertEqual(calls[0]["username"], "room@chatroom") + self.assertFalse(bool(calls[0]["allow_fallback_scan"])) + self.assertIn("emoji:" + md5, media_written) + self.assertEqual(media_written["emoji:" + md5], "") + finally: + svc._try_find_decrypted_resource = original_try_find + svc._resolve_media_path_for_kind = original_resolve_kind + + def test_image_lookup_keeps_fallback_scan_enabled(self): + svc = self._reload_export_module() + md5 = "80793a35a19810699a03579c654f4c50" + calls: list[dict[str, object]] = [] + + original_try_find = svc._try_find_decrypted_resource + original_resolve_kind = svc._resolve_media_path_for_kind + try: + svc._try_find_decrypted_resource = lambda *args, **kwargs: None + + def fake_resolve_kind(account_dir, kind, md5, username, allow_fallback_scan=True): + calls.append( + { + "kind": kind, + "md5": md5, + "allow_fallback_scan": allow_fallback_scan, + } + ) + return None + + svc._resolve_media_path_for_kind = fake_resolve_kind + + with TemporaryDirectory() as td: + with io.BytesIO() as buf, zipfile.ZipFile(buf, "w") as zf: + arc, is_new = svc._materialize_media( + zf=zf, + account_dir=Path(td), + conv_username="friend", + kind="image", + md5=md5, + file_id="", + media_written={}, + suggested_name="", + media_index=None, + ) + + self.assertEqual(arc, "") + self.assertFalse(is_new) + self.assertEqual(len(calls), 2) + self.assertEqual(calls[0]["kind"], "image") + self.assertFalse(bool(calls[0]["allow_fallback_scan"])) + self.assertEqual(calls[1]["kind"], "image") + self.assertTrue(bool(calls[1]["allow_fallback_scan"])) + finally: + svc._try_find_decrypted_resource = original_try_find + svc._resolve_media_path_for_kind = original_resolve_kind + + +if __name__ == "__main__": + unittest.main()