Compare commits

...

3 Commits

9 changed files with 1400 additions and 219 deletions
+7 -5
View File
@@ -128,7 +128,7 @@
<button
type="button"
class="search-sidebar-close"
@click="closeMessageSearch"
@click="closeMessageSearch('close-button')"
title="关闭搜索 (Esc)"
>
<svg class="w-5 h-5" fill="none" stroke="currentColor" viewBox="0 0 24 24">
@@ -175,9 +175,9 @@
:class="{ 'privacy-blur': privacyMode }"
@focus="searchInputFocused = true"
@blur="searchInputFocused = false"
@keydown.enter.exact.prevent="runMessageSearch({ reset: true })"
@keydown.enter.exact.prevent="runMessageSearch({ reset: true, source: 'input-enter' })"
@keydown.enter.shift.prevent="onSearchPrev"
@keydown.escape="closeMessageSearch"
@keydown.escape="closeMessageSearch('input-escape')"
/>
<!-- 清除按钮 -->
@@ -185,7 +185,7 @@
v-if="messageSearchQuery"
type="button"
class="search-clear-inline"
@click="messageSearchQuery = ''; runMessageSearch({ reset: true })"
@click="messageSearchQuery = ''; runMessageSearch({ reset: true, source: 'clear-button' })"
>
<svg class="w-3.5 h-3.5" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M6 18L18 6M6 6l12 12"/>
@@ -197,7 +197,7 @@
type="button"
class="search-btn-inline"
:disabled="messageSearchLoading"
@click="runMessageSearch({ reset: true })"
@click="runMessageSearch({ reset: true, source: 'search-button' })"
>
<svg v-if="messageSearchLoading" class="animate-spin w-4 h-4" fill="none" viewBox="0 0 24 24">
<circle class="opacity-25" cx="12" cy="12" r="10" stroke="currentColor" stroke-width="4"></circle>
@@ -428,6 +428,8 @@
:key="hit.id + ':' + idx"
class="sidebar-result-card"
:class="{ 'sidebar-result-card-selected': idx === messageSearchSelectedIndex }"
@pointerdown="onSearchHitPointerDown(hit, idx, $event)"
@click.capture="onSearchHitClickCapture(hit, idx, $event)"
@click="onSearchHitClick(hit, idx)"
>
<div class="sidebar-result-row">
+445 -31
View File
@@ -43,6 +43,74 @@ export const useChatSearch = ({
selectContact,
loadMoreMessages
}) => {
const isDesktopRenderer = () => {
if (!process.client || typeof window === 'undefined') return false
return !!window.wechatDesktop?.__brand
}
const logSearchPhase = (phase, details = {}) => {
const payload = {
account: String(selectedAccount.value || '').trim(),
selectedUsername: String(selectedContact.value?.username || '').trim(),
contextUsername: String(searchContext.value?.username || '').trim(),
...details
}
if (isDesktopRenderer()) {
try {
window.wechatDesktop?.logDebug?.('chat-search', phase, payload)
} catch {}
}
console.info(`[chat-search] ${phase}`, payload)
}
const describeEventTarget = (target) => {
if (!target || typeof target !== 'object') return ''
const nodeName = String(target.nodeName || '').trim().toLowerCase()
let cls = ''
try {
cls = typeof target.className === 'string'
? target.className
: String(target.className?.baseVal || '')
} catch {
cls = ''
}
cls = String(cls || '').trim().replace(/\s+/g, '.')
if (!nodeName) return cls.slice(0, 120)
if (!cls) return nodeName
return `${nodeName}.${cls}`.slice(0, 120)
}
const summarizeHitForLog = (hit, idx) => ({
index: Number(idx ?? -1),
hitId: String(hit?.id || '').trim(),
hitUsername: String(hit?.username || '').trim(),
conversationName: String(hit?.conversationName || '').trim(),
senderUsername: String(hit?.senderUsername || '').trim(),
renderType: String(hit?.renderType || '').trim(),
createTime: Number(hit?.createTime || 0),
isSent: !!hit?.isSent
})
const buildRunSearchLogDetails = ({ source = 'unknown', reset = false } = {}) => {
const q = String(messageSearchQuery.value || '').trim()
return {
source: String(source || 'unknown'),
reset: !!reset,
scope: String(messageSearchScope.value || 'conversation'),
queryLength: q.length,
selectedContactUsername: String(selectedContact.value?.username || '').trim(),
sender: String(messageSearchSender.value || '').trim(),
sessionType: String(messageSearchSessionType.value || '').trim(),
rangeDays: String(messageSearchRangeDays.value || '').trim(),
startDate: String(messageSearchStartDate.value || '').trim(),
endDate: String(messageSearchEndDate.value || '').trim(),
offset: Number(messageSearchOffset.value || 0),
resultCount: Number(messageSearchResults.value?.length || 0)
}
}
const messageSearchOpen = ref(false)
const messageSearchQuery = ref('')
const messageSearchScope = ref('global') // conversation | global
@@ -129,7 +197,7 @@ try {
// 应用搜索历史
const applySearchHistory = async (query) => {
messageSearchQuery.value = query
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'history-apply' })
}
const messageSearchIndexExists = computed(() => !!messageSearchIndexInfo.value?.exists)
@@ -274,11 +342,22 @@ closeMessageSearchSenderDropdown()
const fetchMessageSearchIndexStatus = async () => {
if (!selectedAccount.value) return null
logSearchPhase('search-index-status:start', {
queryLength: String(messageSearchQuery.value || '').trim().length
})
try {
const resp = await api.getChatSearchIndexStatus({ account: selectedAccount.value })
messageSearchIndexInfo.value = resp?.index || null
logSearchPhase('search-index-status:end', {
exists: !!messageSearchIndexInfo.value?.exists,
ready: !!messageSearchIndexInfo.value?.ready,
buildStatus: String(messageSearchIndexInfo.value?.build?.status || '').trim()
})
return messageSearchIndexInfo.value
} catch (e) {
logSearchPhase('search-index-status:error', {
error: String(e?.message || e || '')
})
return null
}
}
@@ -288,6 +367,7 @@ messageSearchSenderError.value = ''
if (!selectedAccount.value) {
messageSearchSenderOptions.value = []
messageSearchSenderOptionsKey.value = ''
logSearchPhase('search-senders:skip:no-account')
return []
}
@@ -303,6 +383,9 @@ if (scope === 'conversation') {
if (!selectedContact.value?.username) {
messageSearchSenderOptions.value = []
messageSearchSenderOptionsKey.value = ''
logSearchPhase('search-senders:skip:no-selected-contact', {
scope
})
return []
}
params.username = selectedContact.value.username
@@ -310,6 +393,10 @@ if (scope === 'conversation') {
if (msgQ.length < 2) {
messageSearchSenderOptions.value = []
messageSearchSenderOptionsKey.value = ''
logSearchPhase('search-senders:skip:query-too-short', {
scope,
queryLength: msgQ.length
})
return []
}
}
@@ -348,10 +435,21 @@ if (scope === 'global') {
}
messageSearchSenderLoading.value = true
logSearchPhase('search-senders:start', {
scope,
queryLength: msgQ.length,
username: String(params.username || '').trim()
})
try {
const resp = await api.listChatSearchSenders(params)
const status = String(resp?.status || 'success')
if (status !== 'success') {
logSearchPhase('search-senders:non-success', {
scope,
status,
queryLength: msgQ.length,
message: String(resp?.message || '')
})
if (status !== 'index_building') {
messageSearchSenderError.value = String(resp?.message || '加载发送者失败')
}
@@ -366,11 +464,22 @@ try {
if (cur && !list.some((s) => String(s?.username || '').trim() === cur)) {
messageSearchSender.value = ''
}
logSearchPhase('search-senders:end', {
scope,
queryLength: msgQ.length,
username: String(params.username || '').trim(),
optionCount: list.length
})
return list
} catch (e) {
messageSearchSenderError.value = e?.message || '加载发送者失败'
messageSearchSenderOptions.value = []
messageSearchSenderOptionsKey.value = ''
logSearchPhase('search-senders:error', {
scope,
queryLength: msgQ.length,
error: String(e?.message || e || '')
})
return []
} finally {
messageSearchSenderLoading.value = false
@@ -401,7 +510,7 @@ messageSearchIndexPollTimer = setInterval(async () => {
await fetchMessageSearchSenders()
}
if (String(messageSearchQuery.value || '').trim()) {
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'index-poll-ready' })
}
}
}, 1200)
@@ -457,6 +566,33 @@ if (scope === 'global') {
}
return (text.charAt(0) || '?').toString()
}
const buildTransientSearchTargetContact = ({ username, displayName = '', avatar = '', isGroup = null } = {}) => {
const u = String(username || '').trim()
if (!u) return null
const name = String(displayName || u).trim() || u
return {
id: u,
username: u,
name,
avatar: String(avatar || '').trim() || null,
avatarColor: '#4B5563',
lastMessage: '',
lastMessageTime: '',
unreadCount: 0,
isGroup: typeof isGroup === 'boolean' ? isGroup : u.endsWith('@chatroom'),
isTop: false
}
}
const resolveSearchTargetContact = ({ username, displayName = '', avatar = '', isGroup = null } = {}) => {
const u = String(username || '').trim()
if (!u) return null
const existing = contacts.value.find((c) => String(c?.username || '').trim() === u)
if (existing) return existing
if (String(selectedContact.value?.username || '').trim() === u) return selectedContact.value
return buildTransientSearchTargetContact({ username: u, displayName, avatar, isGroup })
}
const searchContextBannerText = computed(() => {
if (!searchContext.value?.active) return ''
const kind = String(searchContext.value.kind || 'search')
@@ -596,7 +732,13 @@ for (let i = 0; i < 42; i++) {
}
return out
})
const closeMessageSearch = () => {
const closeMessageSearch = (reason = 'manual') => {
logSearchPhase('message-search:close', {
reason: String(reason || 'manual'),
queryLength: String(messageSearchQuery.value || '').trim().length,
resultCount: Number(messageSearchResults.value?.length || 0),
selectedIndex: Number(messageSearchSelectedIndex.value ?? -1)
})
messageSearchOpen.value = false
closeMessageSearchSenderDropdown()
messageSearchError.value = ''
@@ -771,29 +913,47 @@ if (messageSearchScope.value === 'conversation' && !selectedContact.value) {
}
const toggleMessageSearch = async () => {
messageSearchOpen.value = !messageSearchOpen.value
const nextOpen = !messageSearchOpen.value
logSearchPhase('message-search:toggle', {
nextOpen,
queryLength: String(messageSearchQuery.value || '').trim().length,
resultCount: Number(messageSearchResults.value?.length || 0)
})
messageSearchOpen.value = nextOpen
ensureMessageSearchScopeValid()
if (!messageSearchOpen.value) return
if (!messageSearchOpen.value) {
closeMessageSearch('toggle-close')
return
}
closeTimeSidebar()
await nextTick()
try {
messageSearchInputRef.value?.focus?.()
} catch {}
logSearchPhase('message-search:open:ready', {
scope: String(messageSearchScope.value || 'conversation'),
selectedContactUsername: String(selectedContact.value?.username || '').trim()
})
await fetchMessageSearchIndexStatus()
await fetchMessageSearchSenders()
if (String(messageSearchQuery.value || '').trim()) {
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'toggle-open-with-query' })
}
}
let messageSearchReqId = 0
const runMessageSearch = async ({ reset } = {}) => {
if (!selectedAccount.value) return
const runMessageSearch = async ({ reset, source = 'unknown' } = {}) => {
if (!selectedAccount.value) {
logSearchPhase('runMessageSearch:skip:no-account', buildRunSearchLogDetails({ source, reset }))
return
}
ensureMessageSearchScopeValid()
const q = String(messageSearchQuery.value || '').trim()
logSearchPhase('runMessageSearch:start', buildRunSearchLogDetails({ source, reset }))
if (!q) {
logSearchPhase('runMessageSearch:skip:empty-query', buildRunSearchLogDetails({ source, reset }))
messageSearchResults.value = []
messageSearchHasMore.value = false
messageSearchError.value = ''
@@ -814,6 +974,10 @@ const reqId = ++messageSearchReqId
messageSearchLoading.value = true
messageSearchError.value = ''
messageSearchBackendStatus.value = ''
logSearchPhase('runMessageSearch:request:start', {
...buildRunSearchLogDetails({ source, reset }),
reqId
})
const scope = String(messageSearchScope.value || 'conversation')
@@ -858,6 +1022,7 @@ if (String(messageSearchSender.value || '').trim()) {
if (scope === 'conversation') {
if (!selectedContact.value?.username) {
logSearchPhase('runMessageSearch:skip:no-selected-contact', buildRunSearchLogDetails({ source, reset }))
messageSearchLoading.value = false
messageSearchError.value = '请选择一个会话再搜索'
return
@@ -867,7 +1032,15 @@ if (scope === 'conversation') {
try {
const resp = await api.searchChatMessages(params)
if (reqId !== messageSearchReqId) return
if (reqId !== messageSearchReqId) {
logSearchPhase('runMessageSearch:response:stale', {
...buildRunSearchLogDetails({ source, reset }),
reqId,
activeReqId: Number(messageSearchReqId || 0),
status: String(resp?.status || '').trim()
})
return
}
if (resp?.index) {
messageSearchIndexInfo.value = resp.index
@@ -877,6 +1050,11 @@ try {
messageSearchBackendStatus.value = status
if (status === 'index_building') {
logSearchPhase('runMessageSearch:response:index-building', {
...buildRunSearchLogDetails({ source, reset }),
reqId,
status
})
if (reset) {
messageSearchResults.value = []
messageSearchSelectedIndex.value = -1
@@ -888,6 +1066,12 @@ try {
}
if (status === 'index_error') {
logSearchPhase('runMessageSearch:response:index-error', {
...buildRunSearchLogDetails({ source, reset }),
reqId,
status,
message: String(resp?.message || '')
})
if (reset) {
messageSearchResults.value = []
messageSearchSelectedIndex.value = -1
@@ -900,6 +1084,12 @@ try {
}
if (status !== 'success') {
logSearchPhase('runMessageSearch:response:non-success', {
...buildRunSearchLogDetails({ source, reset }),
reqId,
status,
message: String(resp?.message || '')
})
if (reset) {
messageSearchResults.value = []
messageSearchSelectedIndex.value = -1
@@ -925,6 +1115,17 @@ try {
messageSearchSelectedIndex.value = 0
}
logSearchPhase('runMessageSearch:response:success', {
...buildRunSearchLogDetails({ source, reset }),
reqId,
status,
hitsCount: hits.length,
total: Number(resp?.total ?? resp?.totalInScan ?? 0),
hasMore: !!resp?.hasMore,
backendScope: String(resp?.scope || '').trim(),
backendUsername: String(resp?.username || '').trim()
})
// 保存搜索历史(仅在有结果时保存)
if (!privacyMode.value && reset && hits.length > 0) {
saveSearchHistory(q)
@@ -932,9 +1133,20 @@ try {
} catch (e) {
if (reqId !== messageSearchReqId) return
messageSearchError.value = e?.message || '搜索失败'
logSearchPhase('runMessageSearch:error', {
...buildRunSearchLogDetails({ source, reset }),
reqId,
error: String(e?.message || e || '')
})
} finally {
if (reqId === messageSearchReqId) {
messageSearchLoading.value = false
logSearchPhase('runMessageSearch:finalize', {
...buildRunSearchLogDetails({ source, reset }),
reqId,
loading: !!messageSearchLoading.value,
error: String(messageSearchError.value || '')
})
}
}
}
@@ -943,7 +1155,7 @@ const loadMoreSearchResults = async () => {
if (!messageSearchHasMore.value) return
if (messageSearchLoading.value) return
messageSearchOffset.value = Number(messageSearchOffset.value || 0) + messageSearchLimit
await runMessageSearch({ reset: false })
await runMessageSearch({ reset: false, source: 'load-more' })
}
const exitSearchContext = async () => {
@@ -980,19 +1192,69 @@ updateJumpToBottomState()
const locateSearchHit = async (hit) => {
if (!process.client) return
if (!selectedAccount.value) return
if (!hit?.id) return
if (!selectedAccount.value) {
logSearchPhase('locateSearchHit:skip:no-account', {
hitId: String(hit?.id || '').trim(),
hitUsername: String(hit?.username || '').trim()
})
return
}
if (!hit?.id) {
logSearchPhase('locateSearchHit:skip:missing-hit-id', {
hitKeys: Object.keys(hit || {})
})
return
}
const targetUsername = String(hit?.username || selectedContact.value?.username || '').trim()
if (!targetUsername) return
if (!targetUsername) {
logSearchPhase('locateSearchHit:skip:missing-target-username', {
hitId: String(hit?.id || '').trim(),
hitUsername: String(hit?.username || '').trim(),
selectedUsernameFallback: String(selectedContact.value?.username || '').trim()
})
return
}
const targetContact = contacts.value.find((c) => c?.username === targetUsername)
logSearchPhase('locateSearchHit:start', {
hitId: String(hit?.id || '').trim(),
hitUsername: String(hit?.username || '').trim(),
targetUsername,
conversationName: String(hit?.conversationName || '').trim()
})
const targetContact = resolveSearchTargetContact({
username: targetUsername,
displayName: String(hit?.conversationName || hit?.username || targetUsername).trim(),
avatar: String(hit?.conversationAvatar || hit?.senderAvatar || '').trim(),
isGroup: targetUsername.endsWith('@chatroom')
})
logSearchPhase('locateSearchHit:target-resolved', {
hitId: String(hit?.id || '').trim(),
targetUsername,
contactResolved: !!targetContact,
contactSource: targetContact
? (contacts.value.find((c) => String(c?.username || '').trim() === targetUsername) ? 'contacts' : 'transient')
: 'none'
})
if (targetContact && selectedContact.value?.username !== targetUsername) {
logSearchPhase('locateSearchHit:selectContact:start', {
hitId: String(hit?.id || '').trim(),
targetUsername
})
await selectContact(targetContact, { skipLoadMessages: true })
logSearchPhase('locateSearchHit:selectContact:done', {
hitId: String(hit?.id || '').trim(),
targetUsername
})
}
if (searchContext.value?.active && searchContext.value.username !== targetUsername) {
await exitSearchContext()
logSearchPhase('locateSearchHit:exitSearchContext:done', {
hitId: String(hit?.id || '').trim(),
targetUsername
})
}
if (!searchContext.value?.active) {
@@ -1019,8 +1281,20 @@ if (!searchContext.value?.active) {
searchContext.value.loadingBefore = false
searchContext.value.loadingAfter = false
}
logSearchPhase('locateSearchHit:search-context:ready', {
hitId: String(hit?.id || '').trim(),
targetUsername,
contextActive: !!searchContext.value?.active,
savedMessagesCount: Number(searchContext.value?.savedMessages?.length || 0)
})
try {
logSearchPhase('locateSearchHit:messagesAround:start', {
hitId: String(hit?.id || '').trim(),
targetUsername,
before: 35,
after: 35
})
const resp = await api.getChatMessagesAround({
account: selectedAccount.value,
username: targetUsername,
@@ -1033,13 +1307,37 @@ try {
const mapped = raw.map(normalizeMessage)
allMessages.value = { ...allMessages.value, [targetUsername]: mapped }
messagesMeta.value = { ...messagesMeta.value, [targetUsername]: { total: mapped.length, hasMore: false } }
logSearchPhase('locateSearchHit:messagesAround:end', {
hitId: String(hit?.id || '').trim(),
targetUsername,
messageCount: mapped.length,
anchorId: String(resp?.anchorId || hit?.id || '').trim(),
anchorIndex: Number(resp?.anchorIndex ?? -1)
})
searchContext.value.anchorId = String(resp?.anchorId || hit.id)
searchContext.value.anchorIndex = Number(resp?.anchorIndex ?? -1)
logSearchPhase('locateSearchHit:scroll:start', {
hitId: String(hit?.id || '').trim(),
targetUsername,
anchorId: String(searchContext.value.anchorId || '').trim(),
messageCount: mapped.length
})
const ok = await scrollToMessageId(searchContext.value.anchorId)
logSearchPhase('locateSearchHit:scroll:end', {
hitId: String(hit?.id || '').trim(),
targetUsername,
anchorId: String(searchContext.value.anchorId || '').trim(),
scrollFound: !!ok
})
if (ok) flashMessage(searchContext.value.anchorId)
} catch (e) {
logSearchPhase('locateSearchHit:error', {
hitId: String(hit?.id || '').trim(),
targetUsername,
error: String(e?.message || e || '')
})
window.alert(e?.message || '定位失败')
}
}
@@ -1051,7 +1349,12 @@ const u = String(targetUsername || selectedContact.value?.username || '').trim()
const anchor = String(anchorId || '').trim()
if (!u || !anchor) return
const targetContact = contacts.value.find((c) => c?.username === u)
const targetContact = resolveSearchTargetContact({
username: u,
displayName: String(selectedContact.value?.name || u).trim(),
avatar: String(selectedContact.value?.avatar || '').trim(),
isGroup: u.endsWith('@chatroom')
})
if (targetContact && selectedContact.value?.username !== u) {
await selectContact(targetContact, { skipLoadMessages: true })
}
@@ -1317,9 +1620,37 @@ try {
}
}
const onSearchHitPointerDown = (hit, idx, event) => {
logSearchPhase('search-result:pointerdown', {
...summarizeHitForLog(hit, idx),
button: Number(event?.button ?? -1),
detail: Number(event?.detail ?? 0),
target: describeEventTarget(event?.target),
currentTarget: describeEventTarget(event?.currentTarget)
})
}
const onSearchHitClickCapture = (hit, idx, event) => {
logSearchPhase('search-result:click-capture', {
...summarizeHitForLog(hit, idx),
button: Number(event?.button ?? -1),
detail: Number(event?.detail ?? 0),
target: describeEventTarget(event?.target),
currentTarget: describeEventTarget(event?.currentTarget)
})
}
const onSearchHitClick = async (hit, idx) => {
messageSearchSelectedIndex.value = Number(idx || 0)
logSearchPhase('onSearchHitClick', {
...summarizeHitForLog(hit, idx),
selectedIndex: Number(messageSearchSelectedIndex.value || 0)
})
await locateSearchHit(hit)
logSearchPhase('onSearchHitClick:done', {
...summarizeHitForLog(hit, idx),
selectedIndex: Number(messageSearchSelectedIndex.value || 0)
})
}
const onSearchNext = async () => {
@@ -1327,7 +1658,7 @@ const q = String(messageSearchQuery.value || '').trim()
if (!q) return
if (!messageSearchResults.value.length && !messageSearchLoading.value) {
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'search-next-bootstrap' })
}
if (!messageSearchResults.value.length) return
@@ -1342,7 +1673,7 @@ const q = String(messageSearchQuery.value || '').trim()
if (!q) return
if (!messageSearchResults.value.length && !messageSearchLoading.value) {
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'search-prev-bootstrap' })
}
if (!messageSearchResults.value.length) return
@@ -1355,13 +1686,27 @@ const openMessageSearch = async () => {
closeTimeSidebar()
messageSearchOpen.value = true
ensureMessageSearchScopeValid()
logSearchPhase('message-search:open:start', {
scope: String(messageSearchScope.value || 'conversation'),
queryLength: String(messageSearchQuery.value || '').trim().length
})
await nextTick()
try {
messageSearchInputRef.value?.focus?.()
} catch {}
await fetchMessageSearchIndexStatus()
logSearchPhase('message-search:open:end', {
scope: String(messageSearchScope.value || 'conversation'),
queryLength: String(messageSearchQuery.value || '').trim().length
})
}
watch(messageSearchScope, async () => {
watch(messageSearchScope, async (next, prev) => {
logSearchPhase('message-search-scope:change', {
previous: String(prev || '').trim(),
next: String(next || '').trim(),
open: !!messageSearchOpen.value,
queryLength: String(messageSearchQuery.value || '').trim().length
})
if (!messageSearchOpen.value) return
ensureMessageSearchScopeValid()
closeMessageSearchSenderDropdown()
@@ -1373,22 +1718,34 @@ messageSearchOffset.value = 0
messageSearchResults.value = []
messageSearchSelectedIndex.value = -1
if (String(messageSearchQuery.value || '').trim()) {
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'scope-change' })
}
})
watch(messageSearchRangeDays, async () => {
watch(messageSearchRangeDays, async (next, prev) => {
logSearchPhase('message-search-range:change', {
previous: String(prev || '').trim(),
next: String(next || '').trim(),
open: !!messageSearchOpen.value,
queryLength: String(messageSearchQuery.value || '').trim().length
})
if (!messageSearchOpen.value) return
closeMessageSearchSenderDropdown()
messageSearchOffset.value = 0
messageSearchResults.value = []
messageSearchSelectedIndex.value = -1
if (String(messageSearchQuery.value || '').trim()) {
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'range-change' })
}
})
watch(messageSearchSessionType, async () => {
watch(messageSearchSessionType, async (next, prev) => {
logSearchPhase('message-search-session-type:change', {
previous: String(prev || '').trim(),
next: String(next || '').trim(),
open: !!messageSearchOpen.value,
queryLength: String(messageSearchQuery.value || '').trim().length
})
if (!messageSearchOpen.value) return
if (String(messageSearchScope.value || '') !== 'global') return
closeMessageSearchSenderDropdown()
@@ -1400,11 +1757,18 @@ messageSearchOffset.value = 0
messageSearchResults.value = []
messageSearchSelectedIndex.value = -1
if (String(messageSearchQuery.value || '').trim()) {
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'session-type-change' })
}
})
watch([messageSearchStartDate, messageSearchEndDate], async () => {
watch([messageSearchStartDate, messageSearchEndDate], async ([nextStart, nextEnd], [prevStart, prevEnd]) => {
logSearchPhase('message-search-custom-range:change', {
previousStart: String(prevStart || '').trim(),
previousEnd: String(prevEnd || '').trim(),
nextStart: String(nextStart || '').trim(),
nextEnd: String(nextEnd || '').trim(),
open: !!messageSearchOpen.value
})
if (!messageSearchOpen.value) return
if (String(messageSearchRangeDays.value || '') !== 'custom') return
closeMessageSearchSenderDropdown()
@@ -1412,34 +1776,62 @@ messageSearchOffset.value = 0
messageSearchResults.value = []
messageSearchSelectedIndex.value = -1
if (String(messageSearchQuery.value || '').trim()) {
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'custom-range-change' })
}
})
watch(messageSearchSender, async () => {
watch(messageSearchSender, async (next, prev) => {
logSearchPhase('message-search-sender:change', {
previous: String(prev || '').trim(),
next: String(next || '').trim(),
open: !!messageSearchOpen.value,
queryLength: String(messageSearchQuery.value || '').trim().length
})
if (!messageSearchOpen.value) return
messageSearchOffset.value = 0
messageSearchResults.value = []
messageSearchSelectedIndex.value = -1
if (String(messageSearchQuery.value || '').trim()) {
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'sender-change' })
}
})
watch(messageSearchQuery, () => {
watch(messageSearchQuery, (next, prev) => {
logSearchPhase('message-search-query:change', {
previousLength: String(prev || '').trim().length,
nextLength: String(next || '').trim().length,
open: !!messageSearchOpen.value
})
if (!messageSearchOpen.value) return
if (messageSearchDebounceTimer) clearTimeout(messageSearchDebounceTimer)
messageSearchDebounceTimer = null
const q = String(messageSearchQuery.value || '').trim()
if (q.length < 2) return
if (q.length < 2) {
logSearchPhase('message-search-query:debounce:skip-short', {
queryLength: q.length
})
return
}
logSearchPhase('message-search-query:debounce:scheduled', {
queryLength: q.length,
delayMs: 280
})
messageSearchDebounceTimer = setTimeout(() => {
runMessageSearch({ reset: true })
logSearchPhase('message-search-query:debounce:fire', {
queryLength: String(messageSearchQuery.value || '').trim().length
})
runMessageSearch({ reset: true, source: 'query-debounce' })
}, 280)
})
watch(
() => selectedContact.value?.username,
async () => {
logSearchPhase('message-search-selected-contact:change', {
open: !!messageSearchOpen.value,
scope: String(messageSearchScope.value || '').trim(),
selectedContactUsername: String(selectedContact.value?.username || '').trim()
})
if (!messageSearchOpen.value) return
if (String(messageSearchScope.value || '') !== 'conversation') return
closeMessageSearchSenderDropdown()
@@ -1448,11 +1840,31 @@ async () => {
messageSearchSenderOptionsKey.value = ''
await fetchMessageSearchSenders()
if (String(messageSearchQuery.value || '').trim()) {
await runMessageSearch({ reset: true })
await runMessageSearch({ reset: true, source: 'selected-contact-change' })
}
}
)
watch(messageSearchOpen, (next, prev) => {
logSearchPhase('message-search-open:change', {
previous: !!prev,
next: !!next,
queryLength: String(messageSearchQuery.value || '').trim().length,
resultCount: Number(messageSearchResults.value?.length || 0)
})
})
watch(messageSearchResults, (next, prev) => {
const nextList = Array.isArray(next) ? next : []
const prevList = Array.isArray(prev) ? prev : []
logSearchPhase('message-search-results:change', {
previousCount: prevList.length,
nextCount: nextList.length,
firstHitId: String(nextList[0]?.id || '').trim(),
selectedIndex: Number(messageSearchSelectedIndex.value ?? -1)
})
})
const autoLoadReady = ref(true)
let timeSidebarScrollSyncRaf = null
@@ -1664,6 +2076,8 @@ if (c.scrollTop <= 60 && autoLoadReady.value && hasMoreMessages.value && !isLoad
onTimeSidebarDayClick,
loadMoreSearchContextAfter,
loadMoreSearchContextBefore,
onSearchHitPointerDown,
onSearchHitClickCapture,
onSearchHitClick,
onSearchNext,
onSearchPrev,
+39 -2
View File
@@ -119,6 +119,23 @@ const nextMessageLoadToken = () => {
return messageLoadSequence
}
const buildTransientContact = ({ username, name = '', avatar = '', isGroup = null } = {}) => {
const u = String(username || '').trim()
const displayName = String(name || u).trim() || u
return {
id: u,
username: u,
name: displayName,
avatar: String(avatar || '').trim() || null,
avatarColor: '#4B5563',
lastMessage: '',
lastMessageTime: '',
unreadCount: 0,
isGroup: typeof isGroup === 'boolean' ? isGroup : u.endsWith('@chatroom'),
isTop: false
}
}
const buildChatPath = (username) => {
return username ? `/chat/${encodeURIComponent(username)}` : '/chat'
}
@@ -334,14 +351,28 @@ const selectContact = async (contact, options = {}) => {
}
const applyRouteSelection = async (options = {}) => {
const selectionReason = String(options.reason || 'route-selection').trim() || 'route-selection'
const requested = routeUsername.value || ''
if ((!contacts.value || contacts.value.length === 0) && requested) {
if (selectedContact.value?.username === requested) {
return
}
await selectContact(buildTransientContact({ username: requested }), {
syncRoute: false,
deferLoadMessages: !!options.deferLoadMessages,
reason: `${selectionReason}:transient-route-empty-list`
})
return
}
if (!contacts.value || contacts.value.length === 0) {
selectedContact.value = null
return
}
const selectionReason = String(options.reason || 'route-selection').trim() || 'route-selection'
const requested = routeUsername.value || ''
if (requested) {
if (selectedContact.value?.username === requested) {
return
}
const matched = contacts.value.find((contact) => contact.username === requested)
if (matched) {
if (selectedContact.value?.username !== matched.username) {
@@ -353,6 +384,12 @@ const applyRouteSelection = async (options = {}) => {
}
return
}
await selectContact(buildTransientContact({ username: requested }), {
syncRoute: false,
deferLoadMessages: !!options.deferLoadMessages,
reason: `${selectionReason}:transient-route`
})
return
}
await selectContact(contacts.value[0], {
+83 -6
View File
@@ -14,6 +14,7 @@ from fastapi import HTTPException
from .app_paths import get_output_databases_dir
from .logging_config import get_logger
from .sqlite_diagnostics import collect_sqlite_diagnostics, format_sqlite_diagnostics
try:
import zstandard as zstd # type: ignore
@@ -1755,9 +1756,10 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
session_db_path = Path(account_dir) / "session.db"
if session_db_path.exists() and remaining:
sconn = sqlite3.connect(str(session_db_path))
sconn.row_factory = sqlite3.Row
sconn: Optional[sqlite3.Connection] = None
try:
sconn = sqlite3.connect(str(session_db_path))
sconn.row_factory = sqlite3.Row
uniq = list(dict.fromkeys([u for u in remaining if u]))
chunk_size = 900
for i in range(0, len(uniq), chunk_size):
@@ -1786,10 +1788,24 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
if not u:
continue
expected_ts_by_user[u] = int(r["last_timestamp"] or 0)
except sqlite3.DatabaseError as e:
expected_ts_by_user = {}
logger.warning(
"[sessions.preview] session timestamp lookup failed account=%s db=%s usernames=%s sample_usernames=%s error=%s diag=%s",
account_dir.name,
str(session_db_path),
len(remaining),
sorted([u for u in remaining if u])[:5],
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(session_db_path, quick_check=True, table_name="SessionTable")
),
)
except Exception:
expected_ts_by_user = {}
finally:
sconn.close()
if sconn is not None:
sconn.close()
if _DEBUG_SESSIONS:
logger.info(
@@ -1800,9 +1816,16 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
)
for db_path in db_paths:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn: Optional[sqlite3.Connection] = None
stage = "connect"
stage_username = ""
stage_table = ""
try:
stage = "connect"
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
stage = "sqlite_master"
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
names = [str(r[0]) for r in rows if r and r[0]]
lower_to_actual = {n.lower(): n for n in names}
@@ -1818,9 +1841,12 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
conn.text_factory = bytes
for u, tn in found.items():
stage_username = str(u)
stage_table = str(tn)
quoted = _quote_ident(tn)
try:
try:
stage = "latest_row_with_name2id"
r = conn.execute(
"SELECT "
"m.local_type, m.message_content, m.compress_content, m.create_time, m.sort_seq, m.local_id, "
@@ -1831,6 +1857,7 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
"LIMIT 1"
).fetchone()
except Exception:
stage = "latest_row_without_name2id"
r = conn.execute(
"SELECT "
"local_type, message_content, compress_content, create_time, sort_seq, local_id, '' AS sender_username "
@@ -1838,6 +1865,20 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
"ORDER BY sort_seq DESC, local_id DESC "
"LIMIT 1"
).fetchone()
except sqlite3.DatabaseError as e:
logger.warning(
"[sessions.preview] latest row query failed account=%s db=%s username=%s table=%s stage=%s error=%s diag=%s",
account_dir.name,
str(db_path),
str(u),
str(tn),
stage,
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(db_path, quick_check=True, table_name=tn)
),
)
continue
except Exception as e:
if _DEBUG_SESSIONS:
logger.info(
@@ -1855,6 +1896,7 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
expected_ts = int(expected_ts_by_user.get(u) or 0)
if expected_ts > 0 and create_time > 0 and create_time < expected_ts:
try:
stage = "latest_row_by_create_time_with_name2id"
r2 = conn.execute(
"SELECT "
"m.local_type, m.message_content, m.compress_content, m.create_time, m.sort_seq, m.local_id, "
@@ -1866,6 +1908,7 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
).fetchone()
except Exception:
try:
stage = "latest_row_by_create_time_without_name2id"
r2 = conn.execute(
"SELECT "
"local_type, message_content, compress_content, create_time, sort_seq, local_id, '' AS sender_username "
@@ -1873,6 +1916,20 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
"ORDER BY COALESCE(create_time, 0) DESC, COALESCE(sort_seq, 0) DESC, local_id DESC "
"LIMIT 1"
).fetchone()
except sqlite3.DatabaseError as e:
logger.warning(
"[sessions.preview] latest row requery failed account=%s db=%s username=%s table=%s stage=%s error=%s diag=%s",
account_dir.name,
str(db_path),
str(u),
str(tn),
stage,
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(db_path, quick_check=True, table_name=tn)
),
)
r2 = None
except Exception:
r2 = None
@@ -1900,8 +1957,28 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
prev = best.get(u)
if prev is None or sort_key > prev[0]:
best[u] = (sort_key, preview)
except sqlite3.DatabaseError as e:
logger.warning(
"[sessions.preview] malformed message db account=%s db=%s stage=%s username=%s table=%s remaining=%s sample_usernames=%s error=%s diag=%s",
account_dir.name,
str(db_path),
stage,
stage_username,
stage_table,
len(remaining),
sorted([u for u in remaining if u])[:5],
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(db_path, quick_check=True, table_name=(stage_table or None))
),
)
continue
finally:
conn.close()
if conn is not None:
try:
conn.close()
except Exception:
pass
previews = {u: v[1] for u, v in best.items() if v and v[1]}
if _DEBUG_SESSIONS:
@@ -123,9 +123,62 @@ class ChatRealtimeAutoSyncService:
self._mu = threading.Lock()
self._states: dict[str, _AccountState] = {}
self._paused_accounts: dict[str, int] = {}
self._stop = threading.Event()
self._thread: Optional[threading.Thread] = None
def _is_account_paused_locked(self, account: str) -> bool:
key = str(account or "").strip()
if not key:
return False
return int(self._paused_accounts.get(key) or 0) > 0
def is_account_paused(self, account: str) -> bool:
with self._mu:
return self._is_account_paused_locked(account)
def pause_account(self, account: str, reason: str = "") -> int:
key = str(account or "").strip()
if not key:
return 0
with self._mu:
depth = int(self._paused_accounts.get(key) or 0) + 1
self._paused_accounts[key] = depth
st = self._states.get(key)
if st is not None:
st.due_at = 0.0
logger.info(
"[realtime-autosync] pause account=%s reason=%s depth=%s",
key,
str(reason or "").strip() or "-",
int(depth),
)
return depth
def resume_account(self, account: str, reason: str = "") -> int:
key = str(account or "").strip()
if not key:
return 0
with self._mu:
current = int(self._paused_accounts.get(key) or 0)
if current <= 1:
self._paused_accounts.pop(key, None)
depth = 0
else:
depth = current - 1
self._paused_accounts[key] = depth
logger.info(
"[realtime-autosync] resume account=%s reason=%s depth=%s",
key,
str(reason or "").strip() or "-",
int(depth),
)
return depth
def start(self) -> None:
if not self._enabled:
logger.info("[realtime-autosync] disabled by env WECHAT_TOOL_REALTIME_AUTOSYNC=0")
@@ -188,6 +241,12 @@ class ChatRealtimeAutoSyncService:
if self._stop.is_set():
break
if self.is_account_paused(acc):
with self._mu:
st = self._states.setdefault(acc, _AccountState())
st.due_at = 0.0
continue
try:
account_dir = _resolve_account_dir(acc)
except HTTPException:
@@ -238,6 +297,9 @@ class ChatRealtimeAutoSyncService:
for acc, st in self._states.items():
if running >= int(self._workers):
break
if self._is_account_paused_locked(acc):
st.due_at = 0.0
continue
if st.due_at <= 0 or st.due_at > now:
continue
if st.thread is not None and st.thread.is_alive():
@@ -278,6 +340,9 @@ class ChatRealtimeAutoSyncService:
try:
if self._stop.is_set() or (not account):
return
if self.is_account_paused(account):
logger.info("[realtime-autosync] sync skipped account=%s reason=paused", account)
return
res = self._sync_account(account)
inserted = int((res or {}).get("inserted_total") or (res or {}).get("insertedTotal") or 0)
synced = int((res or {}).get("synced") or (res or {}).get("sessionsSynced") or 0)
@@ -297,6 +362,8 @@ class ChatRealtimeAutoSyncService:
account = str(account or "").strip()
if not account:
return {"status": "skipped", "reason": "missing account"}
if self.is_account_paused(account):
return {"status": "skipped", "reason": "paused"}
try:
account_dir = _resolve_account_dir(account)
+167 -10
View File
@@ -77,6 +77,7 @@ from ..session_last_message import (
get_session_last_message_status,
load_session_last_messages,
)
from ..sqlite_diagnostics import collect_sqlite_diagnostics, format_sqlite_diagnostics
from ..wcdb_realtime import (
WCDBRealtimeError,
WCDB_REALTIME,
@@ -2022,12 +2023,18 @@ def _sync_chat_realtime_messages_for_table(
if backfill_limit > max_scan:
backfill_limit = max_scan
msg_conn = sqlite3.connect(str(msg_db_path))
msg_conn.row_factory = sqlite3.Row
msg_conn: Optional[sqlite3.Connection] = None
stage = "connect"
try:
stage = "connect"
msg_conn = sqlite3.connect(str(msg_db_path))
msg_conn.row_factory = sqlite3.Row
stage = "resolve_db_storage_paths"
msg_db_path_real, _res_db_path_real = _resolve_db_storage_message_paths(account_dir, msg_db_path.stem)
name2id_synced = False
try:
stage = "sync_name2id"
name2id_result = _sync_output_name2id_from_live(
msg_conn,
rt_conn=rt_conn,
@@ -2050,12 +2057,14 @@ def _sync_chat_realtime_messages_for_table(
)
quoted_table = _quote_ident(table_name)
stage = "max_local_id"
row = msg_conn.execute(f"SELECT MAX(local_id) AS mx FROM {quoted_table}").fetchone()
try:
max_local_id = int((row["mx"] if row is not None else 0) or 0)
except Exception:
max_local_id = 0
stage = "pragma_table_info"
cols = msg_conn.execute(f"PRAGMA table_info({quoted_table})").fetchall()
available_cols = {str(c[1] or "") for c in cols}
base_cols = [
@@ -2075,6 +2084,7 @@ def _sync_chat_realtime_messages_for_table(
placeholders = ",".join(["?"] * len(insert_cols))
insert_sql = f"INSERT OR IGNORE INTO {quoted_table} ({','.join(insert_cols)}) VALUES ({placeholders})"
stage = "collect_realtime_rows"
fetch_result = _collect_realtime_rows_for_session(
trace_id=None,
account_name=account_dir.name,
@@ -2094,6 +2104,7 @@ def _sync_chat_realtime_messages_for_table(
backfilled = 0
if new_rows:
if not name2id_synced:
stage = "upsert_name2id_fallback"
_best_effort_upsert_output_name2id_rows(
msg_conn,
account_name=account_dir.name,
@@ -2102,6 +2113,7 @@ def _sync_chat_realtime_messages_for_table(
values = [tuple(r.get(c) for c in insert_cols) for r in reversed(new_rows)]
insert_t0 = time.perf_counter()
stage = "insert_new_rows"
msg_conn.executemany(insert_sql, values)
msg_conn.commit()
insert_ms = (time.perf_counter() - insert_t0) * 1000.0
@@ -2131,6 +2143,7 @@ def _sync_chat_realtime_messages_for_table(
if update_values:
before_changes = msg_conn.total_changes
update_t0 = time.perf_counter()
stage = "backfill_packed_info"
msg_conn.executemany(
f"UPDATE {quoted_table} SET packed_info_data = ? WHERE local_id = ? AND (packed_info_data IS NULL OR length(packed_info_data) = 0)",
update_values,
@@ -2187,8 +2200,11 @@ def _sync_chat_realtime_messages_for_table(
if inserted and newest_ts:
session_db_path = account_dir / "session.db"
sconn = sqlite3.connect(str(session_db_path))
sconn: Optional[sqlite3.Connection] = None
try:
stage = "open_session_db"
sconn = sqlite3.connect(str(session_db_path))
stage = "update_session_table"
sconn.execute("INSERT OR IGNORE INTO SessionTable(username) VALUES (?)", (username,))
sconn.execute(
"""
@@ -2217,6 +2233,7 @@ def _sync_chat_realtime_messages_for_table(
),
)
stage = "update_session_last_message"
_ensure_session_last_message_table(sconn)
sconn.execute(
"""
@@ -2239,8 +2256,25 @@ def _sync_chat_realtime_messages_for_table(
),
)
sconn.commit()
except sqlite3.DatabaseError as e:
logger.warning(
"[realtime] malformed session db during sync account=%s username=%s session_db=%s stage=%s error=%s diag=%s",
account_dir.name,
username,
str(session_db_path),
stage,
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(session_db_path, quick_check=True, table_name="SessionTable")
),
)
raise HTTPException(
status_code=500,
detail=f"Malformed session db during realtime sync: {session_db_path.name}",
)
finally:
sconn.close()
if sconn is not None:
sconn.close()
return {
"username": username,
@@ -2250,8 +2284,23 @@ def _sync_chat_realtime_messages_for_table(
"backfilled": int(backfilled),
"preview": preview or "",
}
except sqlite3.DatabaseError as e:
logger.warning(
"[realtime] malformed decrypted message db account=%s username=%s db=%s table=%s stage=%s error=%s diag=%s",
account_dir.name,
username,
str(msg_db_path),
table_name,
stage,
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(msg_db_path, quick_check=True, table_name=table_name)
),
)
raise HTTPException(status_code=500, detail=f"Malformed decrypted message db: {msg_db_path.name}")
finally:
msg_conn.close()
if msg_conn is not None:
msg_conn.close()
@router.post("/api/chat/realtime/sync_all", summary="实时消息同步到解密库(全会话增量)")
@@ -2545,20 +2594,24 @@ def sync_chat_realtime_messages_all(
except HTTPException as e:
errors.append(f"{uname}: {str(e.detail or '')}".strip())
logger.warning(
"[%s] sync session failed account=%s username=%s err=%s",
"[%s] sync session failed account=%s username=%s db=%s table=%s err=%s",
trace_id,
account_dir.name,
uname,
str(msg_db_path),
str(table_name),
str(e.detail or "").strip(),
)
continue
except Exception as e:
errors.append(f"{uname}: {str(e)}".strip())
logger.exception(
"[%s] sync session crashed account=%s username=%s",
"[%s] sync session crashed account=%s username=%s db=%s table=%s",
trace_id,
account_dir.name,
uname,
str(msg_db_path),
str(table_name),
)
continue
@@ -4173,6 +4226,15 @@ def list_chat_sessions(
)
last_previews = load_session_last_messages(account_dir, usernames)
except Exception:
logger.exception(
"[sessions.list] session_last_message preview load failed account=%s preview_mode=%s usernames=%s diag=%s",
account_dir.name,
preview_mode,
len(usernames),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(account_dir / "session.db", quick_check=True, table_name="session_last_message")
),
)
last_previews = {}
def _is_generic_location_preview(value: Any) -> bool:
@@ -4189,7 +4251,17 @@ def list_chat_sessions(
else [u for u in usernames if u and ((u not in last_previews) or _is_generic_location_preview(last_previews.get(u)))]
)
if targets:
legacy = _load_latest_message_previews(account_dir, targets)
try:
legacy = _load_latest_message_previews(account_dir, targets)
except Exception:
logger.exception(
"[sessions.list] legacy latest-message preview fallback failed account=%s preview_mode=%s targets=%s sample_targets=%s; falling back to session summaries",
account_dir.name,
preview_mode,
len(targets),
[str(u) for u in targets[:5]],
)
legacy = {}
for u, v in legacy.items():
if v:
last_previews[u] = v
@@ -6118,6 +6190,24 @@ async def _search_chat_messages_via_fts(
sender = None
session_type_norm = _normalize_session_type(session_type)
trace_id = f"msg-search-{int(time.time() * 1000)}-{threading.get_ident()}"
logger.info(
"[%s] chat search start account=%s scope=%s username=%s sender=%s q_len=%s token_count=%s limit=%s offset=%s start_time=%s end_time=%s render_types=%s include_hidden=%s include_official=%s",
trace_id,
str(account or "").strip(),
"conversation" if username else "global",
str(username or "").strip(),
str(sender or "").strip(),
len(str(q or "")),
len(tokens),
int(limit),
int(offset),
"" if start_ts is None else int(start_ts),
"" if end_ts is None else int(end_ts),
str(render_types or "").strip(),
bool(include_hidden),
bool(include_official),
)
account_dir = _resolve_account_dir(account)
contact_db_path = account_dir / "contact.db"
@@ -6142,6 +6232,14 @@ async def _search_chat_messages_via_fts(
index_ready = bool(index.get("ready"))
if build_status == "error":
logger.warning(
"[%s] chat search index_error account=%s scope=%s username=%s message=%s",
trace_id,
account_dir.name,
"conversation" if username else "global",
str(username or "").strip(),
str(build.get("error") or "Search index build failed."),
)
return {
"status": "index_error",
"account": account_dir.name,
@@ -6160,6 +6258,14 @@ async def _search_chat_messages_via_fts(
}
if not index_ready:
logger.info(
"[%s] chat search index_building account=%s scope=%s username=%s build_status=%s",
trace_id,
account_dir.name,
"conversation" if username else "global",
str(username or "").strip(),
build_status,
)
return {
"status": "index_building",
"account": account_dir.name,
@@ -6243,7 +6349,13 @@ async def _search_chat_messages_via_fts(
params + [int(limit), int(offset)],
).fetchall()
except Exception as e:
logger.exception("Chat search index query failed")
logger.exception(
"[%s] chat search index query failed account=%s scope=%s username=%s",
trace_id,
account_dir.name,
"conversation" if username else "global",
str(username or "").strip(),
)
return {
"status": "index_error",
"account": account_dir.name,
@@ -6551,7 +6663,7 @@ async def _search_chat_messages_via_fts(
wcdb_display_names=wcdb_display_names,
)
return {
response = {
"status": "success",
"account": account_dir.name,
"scope": scope,
@@ -6566,6 +6678,19 @@ async def _search_chat_messages_via_fts(
"index": index,
"hits": hits,
}
logger.info(
"[%s] chat search done account=%s scope=%s username=%s sender=%s total=%s hits=%s has_more=%s rows=%s",
trace_id,
account_dir.name,
scope,
str(username or "").strip(),
str(sender or "").strip(),
int(total),
len(hits),
bool(response["hasMore"]),
len(rows),
)
return response
@router.get("/api/chat/search", summary="搜索聊天记录(消息)")
@@ -6999,13 +7124,26 @@ async def get_chat_messages_around(
if after > 200:
after = 200
trace_id = f"msg-around-{int(time.time() * 1000)}-{threading.get_ident()}"
logger.info(
"[%s] chat messages around start account=%s username=%s anchor_id=%s before=%s after=%s",
trace_id,
str(account or "").strip(),
str(username or "").strip(),
str(anchor_id or "").strip(),
int(before),
int(after),
)
parts = str(anchor_id).split(":", 2)
if len(parts) != 3:
logger.warning("[%s] chat messages around invalid anchor format anchor_id=%s", trace_id, str(anchor_id or "").strip())
raise HTTPException(status_code=400, detail="Invalid anchor_id.")
anchor_db_stem, anchor_table_name_in, anchor_local_id_str = parts
try:
anchor_local_id = int(anchor_local_id_str)
except Exception:
logger.warning("[%s] chat messages around invalid anchor local_id anchor_id=%s", trace_id, str(anchor_id or "").strip())
raise HTTPException(status_code=400, detail="Invalid anchor_id.")
account_dir = _resolve_account_dir(account)
@@ -7021,6 +7159,13 @@ async def get_chat_messages_around(
anchor_db_path = p
break
if anchor_db_path is None:
logger.warning(
"[%s] chat messages around anchor db missing account=%s username=%s anchor_db=%s",
trace_id,
account_dir.name,
username,
anchor_db_stem,
)
raise HTTPException(status_code=404, detail="Anchor database not found.")
# Open resource DB once (optional), and reuse for all message DBs.
@@ -7419,6 +7564,18 @@ async def get_chat_messages_around(
head_image_db_path=head_image_db_path,
)
logger.info(
"[%s] chat messages around done account=%s username=%s anchor_id=%s canonical_anchor=%s anchor_index=%s returned=%s merged_total=%s",
trace_id,
account_dir.name,
username,
str(anchor_id or "").strip(),
anchor_id_canon,
int(anchor_index),
len(return_messages),
len(merged),
)
return {
"status": "success",
"account": account_dir.name,
+319 -159
View File
@@ -5,12 +5,14 @@ import json
import os
import time
from pathlib import Path
from typing import Any
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel, Field
from starlette.responses import StreamingResponse
from ..app_paths import get_output_databases_dir
from ..chat_realtime_autosync import CHAT_REALTIME_AUTOSYNC
from ..logging_config import get_logger
from ..path_fix import PathFixRoute
from ..key_store import upsert_account_keys_in_store
@@ -21,6 +23,96 @@ logger = get_logger(__name__)
router = APIRouter(route_class=PathFixRoute)
def _normalize_decrypt_guard_accounts(accounts: Any) -> list[str]:
if not accounts:
return []
out: list[str] = []
seen: set[str] = set()
for account in accounts:
key = str(account or "").strip()
if (not key) or (key in seen):
continue
seen.add(key)
out.append(key)
out.sort()
return out
def _resolve_decrypt_guard_accounts(db_storage_path: str) -> list[str]:
try:
scan_result = scan_account_databases_from_path(db_storage_path)
except Exception:
logger.exception("[decrypt] pre-scan accounts failed db_storage_path=%s", db_storage_path)
return []
if scan_result.get("status") == "error":
return []
return _normalize_decrypt_guard_accounts((scan_result.get("account_databases") or {}).keys())
def _get_realtime_sync_all_lock(account: str):
from .chat import _realtime_sync_all_lock
return _realtime_sync_all_lock(account)
def _release_decrypt_account_guards(guards: list[tuple[str, Any]], *, reason: str) -> None:
for account, lock in reversed(list(guards or [])):
try:
lock.release()
logger.info("[decrypt] released realtime sync_all lock account=%s reason=%s", account, reason)
except Exception:
logger.exception("[decrypt] release realtime sync_all lock failed account=%s reason=%s", account, reason)
try:
CHAT_REALTIME_AUTOSYNC.resume_account(account, reason=reason)
logger.info("[decrypt] resumed realtime autosync for account after decrypt account=%s reason=%s", account, reason)
except Exception:
logger.exception(
"[decrypt] resume realtime autosync failed account=%s reason=%s",
account,
reason,
)
def _acquire_decrypt_account_guards(accounts: Any, *, reason: str) -> list[tuple[str, Any]]:
guards: list[tuple[str, Any]] = []
for account in _normalize_decrypt_guard_accounts(accounts):
paused = False
try:
CHAT_REALTIME_AUTOSYNC.pause_account(account, reason=reason)
paused = True
logger.info("[decrypt] paused realtime autosync for account during decrypt account=%s reason=%s", account, reason)
lock = _get_realtime_sync_all_lock(account)
logger.info("[decrypt] waiting realtime sync_all lock account=%s reason=%s", account, reason)
lock.acquire()
logger.info("[decrypt] acquired realtime sync_all lock account=%s reason=%s", account, reason)
guards.append((account, lock))
except Exception:
if paused:
try:
CHAT_REALTIME_AUTOSYNC.resume_account(account, reason=f"{reason}:acquire_failed")
logger.info(
"[decrypt] resumed realtime autosync after guard acquire failure account=%s reason=%s",
account,
f"{reason}:acquire_failed",
)
except Exception:
logger.exception(
"[decrypt] resume realtime autosync after guard acquire failure failed account=%s reason=%s",
account,
reason,
)
_release_decrypt_account_guards(guards, reason=reason)
raise
return guards
class DecryptRequest(BaseModel):
"""解密请求模型"""
@@ -48,17 +140,28 @@ async def decrypt_databases(request: DecryptRequest):
logger.warning(f"密钥格式无效: 长度={len(request.key) if request.key else 0}")
raise HTTPException(status_code=400, detail="密钥格式无效,必须是64位十六进制字符串")
# 使用新的解密API
results = decrypt_wechat_databases(
db_storage_path=request.db_storage_path,
key=request.key,
)
guard_accounts = _resolve_decrypt_guard_accounts(request.db_storage_path)
guards = _acquire_decrypt_account_guards(guard_accounts, reason="decrypt:post")
try:
# 使用新的解密API
results = decrypt_wechat_databases(
db_storage_path=request.db_storage_path,
key=request.key,
)
finally:
_release_decrypt_account_guards(guards, reason="decrypt:post")
if results["status"] == "error":
logger.error(f"解密失败: {results['message']}")
raise HTTPException(status_code=400, detail=results["message"])
logger.info(f"解密完成: 成功 {results['successful_count']}/{results['total_databases']} 个数据库")
if int(results.get("diagnostic_warning_count") or 0) > 0:
logger.warning(
"解密完成但检测到诊断告警: warning_dbs=%s total=%s",
int(results.get("diagnostic_warning_count") or 0),
int(results.get("total_databases") or 0),
)
# 成功解密后,按账号保存数据库密钥(用于前端自动回填)
try:
@@ -77,6 +180,7 @@ async def decrypt_databases(request: DecryptRequest):
"processed_files": results["processed_files"],
"failed_files": results["failed_files"],
"account_results": results.get("account_results", {}),
"diagnostic_warning_count": int(results.get("diagnostic_warning_count") or 0),
}
except HTTPException:
@@ -141,105 +245,146 @@ async def decrypt_databases_stream(
account_sources = scan_result.get("account_sources", {})
total_databases = sum(len(dbs) for dbs in account_databases.values())
yield _sse({"type": "start", "total": total_databases, "message": f"开始解密 {total_databases} 个数据库"})
await asyncio.sleep(0)
# 3) Init output dir & decryptor.
base_output_dir = get_output_databases_dir()
base_output_dir.mkdir(parents=True, exist_ok=True)
decrypt_guards: list[tuple[str, Any]] = []
try:
decryptor = WeChatDatabaseDecryptor(k)
except ValueError as e:
yield _sse({"type": "error", "message": f"密钥错误: {e}"})
return
# 4) Decrypt per account, stream progress.
success_count = 0
fail_count = 0
processed_files: list[str] = []
failed_files: list[str] = []
account_results: dict = {}
overall_current = 0
for account, dbs in account_databases.items():
account_output_dir = base_output_dir / account
account_output_dir.mkdir(parents=True, exist_ok=True)
# Save a hint for later UI (same as non-stream endpoint).
try:
source_info = account_sources.get(account, {})
source_db_storage_path = str(source_info.get("db_storage_path") or p)
wxid_dir = str(source_info.get("wxid_dir") or "")
(account_output_dir / "_source.json").write_text(
json.dumps({"db_storage_path": source_db_storage_path, "wxid_dir": wxid_dir}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
except Exception:
pass
account_success = 0
account_processed: list[str] = []
account_failed: list[str] = []
for db_info in dbs:
if await request.is_disconnected():
return
overall_current += 1
db_path = str(db_info.get("path") or "")
db_name = str(db_info.get("name") or "")
current_file = f"{account}/{db_name}" if account else db_name
# Emit a "processing" event so UI updates immediately for large db files.
guard_accounts = _normalize_decrypt_guard_accounts(account_databases.keys())
if guard_accounts:
yield _sse(
{
"type": "progress",
"current": overall_current,
"total": total_databases,
"success_count": success_count,
"fail_count": fail_count,
"current_file": current_file,
"status": "processing",
"message": "解密中...",
"type": "phase",
"phase": "decrypt_guard",
"message": "正在暂停实时同步并等待解密写锁...",
}
)
await asyncio.sleep(0)
decrypt_guards = await asyncio.to_thread(
_acquire_decrypt_account_guards,
guard_accounts,
reason="decrypt:sse",
)
output_path = account_output_dir / db_name
task = asyncio.create_task(asyncio.to_thread(decryptor.decrypt_database, db_path, str(output_path)))
yield _sse({"type": "start", "total": total_databases, "message": f"开始解密 {total_databases} 个数据库"})
await asyncio.sleep(0)
# Wait with heartbeat (can't yield while awaiting the thread directly).
last_heartbeat = time.time()
while not task.done():
# 3) Init output dir & decryptor.
base_output_dir = get_output_databases_dir()
base_output_dir.mkdir(parents=True, exist_ok=True)
try:
decryptor = WeChatDatabaseDecryptor(k)
except ValueError as e:
yield _sse({"type": "error", "message": f"密钥错误: {e}"})
return
# 4) Decrypt per account, stream progress.
success_count = 0
fail_count = 0
processed_files: list[str] = []
failed_files: list[str] = []
account_results: dict = {}
diagnostic_warning_count = 0
overall_current = 0
for account, dbs in account_databases.items():
account_output_dir = base_output_dir / account
account_output_dir.mkdir(parents=True, exist_ok=True)
# Save a hint for later UI (same as non-stream endpoint).
try:
source_info = account_sources.get(account, {})
source_db_storage_path = str(source_info.get("db_storage_path") or p)
wxid_dir = str(source_info.get("wxid_dir") or "")
(account_output_dir / "_source.json").write_text(
json.dumps(
{"db_storage_path": source_db_storage_path, "wxid_dir": wxid_dir},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
except Exception:
pass
account_success = 0
account_processed: list[str] = []
account_failed: list[str] = []
account_db_diagnostics: dict[str, dict] = {}
account_diagnostic_warning_count = 0
for db_info in dbs:
if await request.is_disconnected():
return
now = time.time()
if now - last_heartbeat > 15:
last_heartbeat = now
# SSE comment heartbeat; browsers ignore but keeps proxies alive.
yield ": ping\n\n"
await asyncio.sleep(0.6)
try:
ok = bool(task.result())
except Exception:
ok = False
if ok:
account_success += 1
success_count += 1
account_processed.append(str(output_path))
processed_files.append(str(output_path))
status = "success"
msg = "解密成功"
else:
account_failed.append(db_path)
failed_files.append(db_path)
fail_count += 1
status = "fail"
msg = "解密失败"
overall_current += 1
db_path = str(db_info.get("path") or "")
db_name = str(db_info.get("name") or "")
current_file = f"{account}/{db_name}" if account else db_name
yield _sse(
{
# Emit a "processing" event so UI updates immediately for large db files.
yield _sse(
{
"type": "progress",
"current": overall_current,
"total": total_databases,
"success_count": success_count,
"fail_count": fail_count,
"current_file": current_file,
"status": "processing",
"message": "解密中...",
}
)
output_path = account_output_dir / db_name
task = asyncio.create_task(asyncio.to_thread(decryptor.decrypt_database, db_path, str(output_path)))
# Wait with heartbeat (can't yield while awaiting the thread directly).
last_heartbeat = time.time()
while not task.done():
if await request.is_disconnected():
return
now = time.time()
if now - last_heartbeat > 15:
last_heartbeat = now
# SSE comment heartbeat; browsers ignore but keeps proxies alive.
yield ": ping\n\n"
await asyncio.sleep(0.6)
try:
ok = bool(task.result())
except Exception:
ok = False
db_diagnostic = dict(getattr(decryptor, "last_result", {}) or {})
if not db_diagnostic:
db_diagnostic = {
"db_path": str(db_path),
"db_name": str(db_name),
"output_path": str(output_path),
"success": bool(ok),
}
db_diagnostic["account"] = str(account)
account_db_diagnostics[db_name] = db_diagnostic
if (
(not bool(db_diagnostic.get("success", ok)))
or int(db_diagnostic.get("failed_pages") or 0) > 0
or str(db_diagnostic.get("diagnostic_status") or "") != "ok"
):
account_diagnostic_warning_count += 1
if ok:
account_success += 1
success_count += 1
account_processed.append(str(output_path))
processed_files.append(str(output_path))
status = "success"
msg = "解密成功"
else:
account_failed.append(db_path)
failed_files.append(db_path)
fail_count += 1
status = "fail"
msg = "解密失败"
payload = {
"type": "progress",
"current": overall_current,
"total": total_databases,
@@ -249,78 +394,93 @@ async def decrypt_databases_stream(
"status": status,
"message": msg,
}
)
if db_diagnostic:
payload["diagnostic_status"] = str(db_diagnostic.get("diagnostic_status") or "")
payload["page_failures"] = int(db_diagnostic.get("failed_pages") or 0)
if db_diagnostic.get("failed_page_samples"):
payload["failed_page_samples"] = db_diagnostic.get("failed_page_samples")
if db_diagnostic.get("diagnostics"):
payload["diagnostics"] = db_diagnostic.get("diagnostics")
if overall_current % 5 == 0:
yield _sse(payload)
if overall_current % 5 == 0:
await asyncio.sleep(0)
account_results[account] = {
"total": len(dbs),
"success": account_success,
"failed": len(dbs) - account_success,
"output_dir": str(account_output_dir),
"processed_files": account_processed,
"failed_files": account_failed,
"db_diagnostics": account_db_diagnostics,
"diagnostic_warning_count": int(account_diagnostic_warning_count),
}
diagnostic_warning_count += int(account_diagnostic_warning_count)
# Build cache table (keep behavior consistent with the POST endpoint).
if os.environ.get("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE", "1") != "0":
yield _sse(
{
"type": "phase",
"phase": "session_last_message",
"account": account,
"message": "正在构建会话缓存(最后一条消息)...",
}
)
await asyncio.sleep(0)
account_results[account] = {
"total": len(dbs),
"success": account_success,
"failed": len(dbs) - account_success,
"output_dir": str(account_output_dir),
"processed_files": account_processed,
"failed_files": account_failed,
try:
from ..session_last_message import build_session_last_message_table
task = asyncio.create_task(
asyncio.to_thread(
build_session_last_message_table,
account_output_dir,
rebuild=True,
include_hidden=True,
include_official=True,
)
)
last_heartbeat = time.time()
while not task.done():
if await request.is_disconnected():
return
now = time.time()
if now - last_heartbeat > 15:
last_heartbeat = now
yield ": ping\n\n"
await asyncio.sleep(0.6)
account_results[account]["session_last_message"] = task.result()
except Exception as e:
account_results[account]["session_last_message"] = {"status": "error", "message": str(e)}
status = "completed" if success_count > 0 else "failed"
result = {
"status": status,
"total_databases": total_databases,
"success_count": success_count,
"failure_count": total_databases - success_count,
"output_directory": str(base_output_dir.absolute()),
"message": f"解密完成: 成功 {success_count}/{total_databases}",
"processed_files": processed_files,
"failed_files": failed_files,
"account_results": account_results,
"diagnostic_warning_count": int(diagnostic_warning_count),
}
# Build cache table (keep behavior consistent with the POST endpoint).
if os.environ.get("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE", "1") != "0":
yield _sse(
{
"type": "phase",
"phase": "session_last_message",
"account": account,
"message": "正在构建会话缓存(最后一条消息)...",
}
)
await asyncio.sleep(0)
# Save db key for frontend autofill.
try:
for account in (account_results or {}).keys():
upsert_account_keys_in_store(str(account), db_key=k)
except Exception:
pass
try:
from ..session_last_message import build_session_last_message_table
task = asyncio.create_task(
asyncio.to_thread(
build_session_last_message_table,
account_output_dir,
rebuild=True,
include_hidden=True,
include_official=True,
)
)
last_heartbeat = time.time()
while not task.done():
if await request.is_disconnected():
return
now = time.time()
if now - last_heartbeat > 15:
last_heartbeat = now
yield ": ping\n\n"
await asyncio.sleep(0.6)
account_results[account]["session_last_message"] = task.result()
except Exception as e:
account_results[account]["session_last_message"] = {"status": "error", "message": str(e)}
status = "completed" if success_count > 0 else "failed"
result = {
"status": status,
"total_databases": total_databases,
"success_count": success_count,
"failure_count": total_databases - success_count,
"output_directory": str(base_output_dir.absolute()),
"message": f"解密完成: 成功 {success_count}/{total_databases}",
"processed_files": processed_files,
"failed_files": failed_files,
"account_results": account_results,
}
# Save db key for frontend autofill.
try:
for account in (account_results or {}).keys():
upsert_account_keys_in_store(str(account), db_key=k)
except Exception:
pass
yield _sse({"type": "complete", **result})
yield _sse({"type": "complete", **result})
finally:
if decrypt_guards:
await asyncio.to_thread(_release_decrypt_account_guards, decrypt_guards, reason="decrypt:sse")
headers = {"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}
return StreamingResponse(generate_progress(), media_type="text/event-stream", headers=headers)
@@ -0,0 +1,150 @@
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any, Mapping, Optional
SQLITE_HEADER = b"SQLite format 3\x00"
def _clean_text(value: Any, *, limit: int = 240) -> str:
text = " ".join(str(value or "").split()).strip()
if len(text) > limit:
return text[: limit - 3] + "..."
return text
def _clean_error(exc: BaseException, *, limit: int = 240) -> str:
text = _clean_text(exc, limit=limit)
if text:
return f"{type(exc).__name__}: {text}"
return type(exc).__name__
def _quote_ident(name: str) -> str:
return '"' + str(name or "").replace('"', '""') + '"'
def collect_sqlite_diagnostics(
path: str | Path,
*,
quick_check: bool = True,
table_name: Optional[str] = None,
table_sample_limit: int = 5,
) -> dict[str, Any]:
db_path = Path(path)
diagnostics: dict[str, Any] = {
"path": str(db_path),
"exists": bool(db_path.exists()),
}
if not diagnostics["exists"]:
return diagnostics
try:
diagnostics["size"] = int(db_path.stat().st_size)
except Exception as exc:
diagnostics["size_error"] = _clean_error(exc)
try:
with db_path.open("rb") as f:
header = f.read(len(SQLITE_HEADER))
diagnostics["header_ok"] = header == SQLITE_HEADER
diagnostics["header_hex"] = header.hex()
except Exception as exc:
diagnostics["header_error"] = _clean_error(exc)
if not quick_check:
return diagnostics
conn: sqlite3.Connection | None = None
try:
conn = sqlite3.connect(str(db_path))
try:
row = conn.execute("PRAGMA page_size").fetchone()
diagnostics["page_size"] = int((row[0] if row is not None else 0) or 0)
except Exception as exc:
diagnostics["page_size_error"] = _clean_error(exc)
try:
row = conn.execute("PRAGMA page_count").fetchone()
diagnostics["page_count"] = int((row[0] if row is not None else 0) or 0)
except Exception as exc:
diagnostics["page_count_error"] = _clean_error(exc)
try:
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name").fetchall()
table_names = [str(row[0]) for row in rows if row and row[0]]
diagnostics["table_count"] = len(table_names)
if table_names:
diagnostics["tables_sample"] = table_names[: max(int(table_sample_limit or 0), 1)]
except Exception as exc:
diagnostics["table_list_error"] = _clean_error(exc)
if table_name:
diagnostics["target_table"] = str(table_name)
try:
cols = conn.execute(f"PRAGMA table_info({_quote_ident(table_name)})").fetchall()
diagnostics["target_table_exists"] = bool(cols)
if cols:
diagnostics["target_table_columns"] = [
str(col[1])
for col in cols[:8]
if len(col) > 1 and str(col[1] or "").strip()
]
except Exception as exc:
diagnostics["target_table_error"] = _clean_error(exc)
try:
rows = conn.execute("PRAGMA quick_check").fetchall()
values = [_clean_text(row[0]) for row in rows if row and row[0] is not None]
if values:
diagnostics["quick_check"] = values[0] if len(values) == 1 else values[:5]
diagnostics["quick_check_ok"] = len(values) == 1 and values[0].lower() == "ok"
if len(values) > 5:
diagnostics["quick_check_truncated"] = len(values) - 5
else:
diagnostics["quick_check"] = ""
diagnostics["quick_check_ok"] = None
except Exception as exc:
diagnostics["quick_check_error"] = _clean_error(exc)
diagnostics["quick_check_ok"] = False
except Exception as exc:
diagnostics["connect_error"] = _clean_error(exc)
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
return diagnostics
def sqlite_diagnostics_status(diagnostics: Mapping[str, Any]) -> str:
if not diagnostics:
return "not_run"
if not diagnostics.get("exists", False):
return "missing"
if diagnostics.get("header_ok") is False:
return "bad_header"
if diagnostics.get("connect_error"):
return "connect_error"
if diagnostics.get("quick_check_error"):
return "quick_check_error"
if diagnostics.get("quick_check_ok") is False:
return "quick_check_failed"
if diagnostics.get("quick_check_ok") is True:
return "ok"
return "header_only"
def format_sqlite_diagnostics(diagnostics: Mapping[str, Any]) -> str:
compact: dict[str, Any] = {}
for key, value in diagnostics.items():
if value in (None, "", [], {}):
continue
compact[str(key)] = value
return json.dumps(compact, ensure_ascii=False, sort_keys=True)
+123 -6
View File
@@ -21,6 +21,7 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from .app_paths import get_output_databases_dir
from .sqlite_diagnostics import collect_sqlite_diagnostics, sqlite_diagnostics_status
# 注意:不再支持默认密钥,所有密钥必须通过参数传入
@@ -221,6 +222,7 @@ class WeChatDatabaseDecryptor:
self.key_bytes = bytes.fromhex(key_hex)
except ValueError:
raise ValueError("密钥必须是有效的十六进制字符串")
self.last_result: dict = {}
def decrypt_database(self, db_path: str, output_path: str) -> bool:
"""解密微信4.x版本数据库
@@ -234,6 +236,79 @@ class WeChatDatabaseDecryptor:
from .logging_config import get_logger
logger = get_logger(__name__)
result = {
"db_path": str(db_path),
"db_name": Path(str(db_path)).name,
"output_path": str(output_path),
"success": False,
"copied_as_sqlite": False,
"input_size": 0,
"output_size": 0,
"total_pages": 0,
"successful_pages": 0,
"failed_pages": 0,
"failed_page_samples": [],
"failure_reasons": {},
"diagnostics": {},
"diagnostic_status": "not_run",
"error": "",
}
self.last_result = result
def _append_failed_page(page_num: int, reason: str, error: str = "") -> None:
result["failure_reasons"][reason] = int(result["failure_reasons"].get(reason) or 0) + 1
if len(result["failed_page_samples"]) >= 8:
return
item = {"page": int(page_num), "reason": str(reason)}
err = " ".join(str(error or "").split()).strip()
if err:
item["error"] = err[:200]
result["failed_page_samples"].append(item)
def _finalize(success: bool, error: str = "") -> bool:
result["success"] = bool(success)
if error:
result["error"] = " ".join(str(error).split()).strip()
output_file = Path(str(output_path))
if output_file.exists():
try:
result["output_size"] = int(output_file.stat().st_size)
except Exception:
pass
diagnostics = collect_sqlite_diagnostics(output_file, quick_check=True)
result["diagnostics"] = diagnostics
result["diagnostic_status"] = sqlite_diagnostics_status(diagnostics)
payload = {
"db_name": result["db_name"],
"db_path": result["db_path"],
"output_path": result["output_path"],
"success": result["success"],
"copied_as_sqlite": result["copied_as_sqlite"],
"input_size": result["input_size"],
"output_size": result["output_size"],
"total_pages": result["total_pages"],
"successful_pages": result["successful_pages"],
"failed_pages": result["failed_pages"],
"failure_reasons": result["failure_reasons"],
"failed_page_samples": result["failed_page_samples"],
"diagnostic_status": result["diagnostic_status"],
"diagnostics": result["diagnostics"],
"error": result["error"],
}
log_fn = logger.info
if (
(not result["success"])
or int(result["failed_pages"] or 0) > 0
or str(result["diagnostic_status"] or "") != "ok"
):
log_fn = logger.warning
log_fn("[decrypt.diagnostic] %s", json.dumps(payload, ensure_ascii=False, sort_keys=True))
self.last_result = result
return bool(success)
logger.info(f"开始解密数据库: {db_path}")
try:
@@ -241,17 +316,19 @@ class WeChatDatabaseDecryptor:
encrypted_data = f.read()
logger.info(f"读取文件大小: {len(encrypted_data)} bytes")
result["input_size"] = int(len(encrypted_data))
if len(encrypted_data) < 4096:
logger.warning(f"文件太小,跳过解密: {db_path}")
return False
return _finalize(False, "file_too_small")
# 检查是否已经是解密的数据库
if encrypted_data.startswith(SQLITE_HEADER):
logger.info(f"文件已是SQLite格式,直接复制: {db_path}")
with open(output_path, 'wb') as f:
f.write(encrypted_data)
return True
result["copied_as_sqlite"] = True
return _finalize(True)
# 提取salt (前16字节)
salt = encrypted_data[:16]
@@ -295,6 +372,7 @@ class WeChatDatabaseDecryptor:
total_pages = len(encrypted_data) // page_size
successful_pages = 0
failed_pages = 0
result["total_pages"] = int(total_pages)
# 逐页解密
for cur_page in range(total_pages):
@@ -329,6 +407,7 @@ class WeChatDatabaseDecryptor:
if stored_hmac != expected_hmac:
logger.warning(f"页面 {page_num} HMAC验证失败")
failed_pages += 1
_append_failed_page(page_num, "hmac")
continue
# 提取IV和加密数据用于AES解密
@@ -354,20 +433,32 @@ class WeChatDatabaseDecryptor:
except Exception as e:
logger.error(f"页面 {page_num} AES解密失败: {e}")
failed_pages += 1
_append_failed_page(page_num, "aes", str(e))
continue
logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 {failed_pages}")
result["successful_pages"] = int(successful_pages)
result["failed_pages"] = int(failed_pages)
# 写入解密后的文件
with open(output_path, 'wb') as f:
f.write(decrypted_data)
logger.info(f"解密文件大小: {len(decrypted_data)} bytes")
return True
if failed_pages > 0:
logger.warning(
"解密输出包含页失败: db=%s total_pages=%s failed_pages=%s failure_reasons=%s samples=%s",
result["db_name"],
int(total_pages),
int(failed_pages),
json.dumps(result["failure_reasons"], ensure_ascii=False, sort_keys=True),
json.dumps(result["failed_page_samples"], ensure_ascii=False),
)
return _finalize(True)
except Exception as e:
logger.error(f"解密失败: {db_path}, 错误: {e}")
return False
return _finalize(False, str(e))
def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> dict:
"""
@@ -493,6 +584,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
processed_files = []
failed_files = []
account_results = {}
diagnostic_warning_count = 0
for account_name, databases in account_databases.items():
logger.info(f"开始解密账号 {account_name}{len(databases)} 个数据库")
@@ -523,6 +615,8 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
account_success = 0
account_processed = []
account_failed = []
account_db_diagnostics = {}
account_diagnostic_warning_count = 0
for db_info in databases:
db_path = db_info['path']
@@ -533,7 +627,26 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
# 解密数据库
logger.info(f"解密 {account_name}/{db_name}")
if decryptor.decrypt_database(db_path, str(output_path)):
ok = decryptor.decrypt_database(db_path, str(output_path))
db_diagnostic = dict(getattr(decryptor, "last_result", {}) or {})
if not db_diagnostic:
db_diagnostic = {
"db_path": str(db_path),
"db_name": str(db_name),
"output_path": str(output_path),
"success": bool(ok),
}
db_diagnostic["account"] = str(account_name)
account_db_diagnostics[db_name] = db_diagnostic
if (
(not bool(db_diagnostic.get("success", ok)))
or int(db_diagnostic.get("failed_pages") or 0) > 0
or str(db_diagnostic.get("diagnostic_status") or "") != "ok"
):
account_diagnostic_warning_count += 1
if ok:
account_success += 1
success_count += 1
account_processed.append(str(output_path))
@@ -551,8 +664,11 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
"failed": len(databases) - account_success,
"output_dir": str(account_output_dir),
"processed_files": account_processed,
"failed_files": account_failed
"failed_files": account_failed,
"db_diagnostics": account_db_diagnostics,
"diagnostic_warning_count": int(account_diagnostic_warning_count),
}
diagnostic_warning_count += int(account_diagnostic_warning_count)
# 构建“会话最后一条消息”缓存表:把耗时挪到解密阶段,后续会话列表直接查表
if os.environ.get("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE", "1") != "0":
@@ -586,6 +702,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
"failed_files": failed_files,
"account_results": account_results, # 新增:按账号的详细结果
"detected_accounts": detected_accounts,
"diagnostic_warning_count": int(diagnostic_warning_count),
}
logger.info("=" * 60)