Compare commits

..

10 Commits

10 changed files with 1276 additions and 258 deletions
+394
View File
@@ -0,0 +1,394 @@
<template>
<div class="biz-page h-full min-h-0 flex overflow-hidden" style="background-color: var(--app-shell-bg)">
<div :class="['w-[300px] lg:w-[320px] border-r flex flex-col flex-shrink-0 z-10', isDark ? 'bg-[#1e1e1e] border-[#333]' : 'bg-white border-gray-200']">
<div class="p-3 border-b" :class="isDark ? 'border-[#333]' : 'border-gray-200'" style="background-color: var(--app-surface-muted)">
<div class="contact-search-wrapper flex-1">
<input
v-model="searchQuery"
type="text"
class="contact-search-input"
placeholder="搜索服务号"
/>
</div>
</div>
<div class="flex-1 overflow-y-auto min-h-0">
<div v-if="loadingAccounts" class="flex justify-center py-4">
<span class="text-sm" :class="isDark ? 'text-gray-500' : 'text-gray-400'">加载中...</span>
</div>
<div v-else class="pb-4">
<div
v-for="item in filteredAccounts"
:key="item.username"
@click="selectAccount(item)"
class="flex items-center gap-3 px-4 py-3 cursor-pointer transition-colors border-b"
:class="[
isDark ? 'border-[#333]' : 'border-gray-50',
selectedBizAccount?.username === item.username
? (isDark ? 'bg-[#333]' : 'bg-[#E5E5E5]') // 选中状态
: item.username === 'gh_3dfda90e39d6'
? (isDark ? 'bg-[#2a2a2a] hover:bg-[#333]' : 'bg-[#F2F2F2] hover:bg-[#EAEAEA]') // 微信支付专门的底色
: (isDark ? 'hover:bg-[#252525]' : 'hover:bg-gray-50') // 普通悬浮色
]"
>
<img v-if="item.avatar" :src="api.getBizProxyImageUrl(item.avatar)" :class="['w-10 h-10 rounded-md object-cover flex-shrink-0', isDark ? 'bg-[#333]' : 'bg-gray-200']" alt=""/>
<div v-else class="w-10 h-10 rounded-md bg-[#03C160] text-white flex items-center justify-center text-lg font-medium flex-shrink-0 shadow-sm">
{{ (item.name || item.username).charAt(0).toUpperCase() }}
</div>
<div class="flex-1 min-w-0 flex flex-col justify-center gap-0.5">
<div class="flex justify-between items-center">
<h3 class="text-sm truncate" :class="isDark ? 'text-gray-100' : 'text-gray-900'">{{ item.name || item.username }}</h3>
<span v-if="item.formatted_last_time" class="text-[11px] flex-shrink-0 ml-2" :class="isDark ? 'text-gray-500' : 'text-gray-400'">
{{ item.formatted_last_time }}
</span>
</div>
<div
class="text-[10px] px-1.5 py-0.5 rounded w-max mt-0.5"
:class="[
item.type === 1 ? (isDark ? 'text-[#03C160] bg-[#03C160]/20' : 'text-[#03C160] bg-[#03C160]/10') : // 服务号
item.type === 0 ? (isDark ? 'text-blue-400 bg-blue-900/40' : 'text-blue-500 bg-blue-50') : // 公众号
item.type === 2 ? (isDark ? 'text-orange-400 bg-orange-900/40' : 'text-orange-500 bg-orange-50') : // 企业号
(isDark ? 'text-gray-400 bg-gray-700/50' : 'text-gray-400 bg-gray-100') // 未知
]"
>
{{ {1: '服务号', 0: '公众号', 2: '企业号', 3: '未知'}[item.type] || '未知' }}
</div>
</div>
</div>
</div>
</div>
</div>
<div class="flex-1 flex flex-col min-h-0 min-w-0" :class="isDark ? 'bg-[#121212]' : 'bg-[#F5F5F5]'">
<div v-if="selectedBizAccount" class="flex-1 flex flex-col min-h-0 relative">
<div class="h-14 border-b flex items-center px-5 shrink-0 z-10" :class="isDark ? 'bg-[#121212] border-[#333]' : 'bg-[#F5F5F5] border-gray-200'">
<h2 class="text-base" :class="isDark ? 'text-gray-100' : 'text-gray-900'">{{ selectedBizAccount.name }}</h2>
</div>
<div class="flex-1 overflow-y-auto px-4 py-6 flex flex-col-reverse" @scroll="handleScroll" ref="messageListRef">
<div class="h-4 shrink-0" aria-hidden="true"></div>
<div v-if="!hasMore" class="text-center text-xs py-4 w-full" :class="isDark ? 'text-gray-500' : 'text-gray-400'">没有更多消息了</div>
<div v-if="loadingMessages" class="text-center text-xs py-4 w-full" :class="isDark ? 'text-gray-500' : 'text-gray-400'">正在加载...</div>
<div class="w-full max-w-[400px] mx-auto flex flex-col-reverse gap-6">
<div v-for="msg in messages" :key="msg.local_id" class="w-full">
<div v-if="selectedBizAccount.username === 'gh_3dfda90e39d6'" class="rounded-xl shadow-sm p-5 border" :class="isDark ? 'bg-[#1e1e1e] border-[#333]' : 'bg-white border-gray-100'">
<div class="flex items-center text-sm mb-5" :class="isDark ? 'text-gray-400' : 'text-gray-500'">
<img v-if="msg.merchant_icon" :src="api.getBizProxyImageUrl(msg.merchant_icon)" class="w-6 h-6 rounded-full mr-2 object-cover" alt=""/>
<div v-else class="w-6 h-6 rounded-full mr-2 flex items-center justify-center" :class="isDark ? 'bg-green-900/40 text-green-400' : 'bg-green-100 text-green-600'">¥</div>
<span>{{ msg.merchant_name || '微信支付' }}</span>
</div>
<div class="text-center mb-6">
<h3 class="text-[22px] font-medium mb-1" :class="isDark ? 'text-gray-100' : 'text-gray-900'">{{ msg.title }}</h3>
</div>
<div class="text-[13px] whitespace-pre-wrap leading-relaxed" :class="isDark ? 'text-gray-400' : 'text-gray-500'">
{{ msg.description }}
</div>
<div class="mt-4 pt-3 border-t text-[12px] text-right" :class="isDark ? 'border-[#333] text-gray-500' : 'border-gray-100 text-gray-400'">
{{ msg.formatted_time }}
</div>
</div>
<div v-else class="rounded-xl shadow-sm overflow-hidden border" :class="isDark ? 'bg-[#1e1e1e] border-[#333]' : 'bg-white border-gray-100'">
<a :href="msg.url" target="_blank" class="block relative group cursor-pointer">
<img :src="msg.cover ? api.getBizProxyImageUrl(msg.cover) : defaultImage" :class="['w-full h-[180px] object-cover', isDark ? 'bg-[#333]' : 'bg-gray-100']" alt=""/>
<div class="absolute bottom-0 left-0 right-0 bg-gradient-to-t from-black/80 to-transparent p-3 pt-8">
<h3 class="text-white text-[15px] font-medium leading-snug line-clamp-2 group-hover:underline">
{{ msg.title }}
</h3>
</div>
</a>
<div v-if="msg.des" class="px-4 py-3 text-[13px] border-b" :class="isDark ? 'text-gray-400 border-[#333]' : 'text-gray-500 border-gray-50'">
{{ msg.des }}
</div>
<div v-if="msg.content_list && msg.content_list.length > 1" class="flex flex-col">
<a
v-for="(item, idx) in msg.content_list.slice(1)"
:key="idx"
:href="item.url"
target="_blank"
class="flex items-center justify-between p-3 border-t hover:bg-opacity-50 cursor-pointer group"
:class="isDark ? 'border-[#333] hover:bg-[#252525]' : 'border-gray-100 hover:bg-gray-50'"
>
<span class="text-[14px] leading-snug line-clamp-2 pr-3 group-hover:underline" :class="isDark ? 'text-gray-200' : 'text-gray-800'">
{{ item.title }}
</span>
<img :src="item.cover ? api.getBizProxyImageUrl(item.cover) : defaultImage" :class="['w-12 h-12 rounded object-cover flex-shrink-0 border', isDark ? 'bg-[#333] border-[#444]' : 'bg-gray-100 border-gray-100']" alt=""/>
</a>
</div>
</div>
</div>
</div>
</div>
</div>
<div v-else class="flex-1 flex items-center justify-center">
<div class="text-center">
<div class="w-20 h-20 mx-auto mb-5 rounded-2xl flex items-center justify-center" :class="isDark ? 'bg-[#2a2a2a]' : 'bg-gray-200/50'">
<svg class="w-10 h-10" :class="isDark ? 'text-gray-600' : 'text-gray-400'" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="1.5" d="M19 20H5a2 2 0 01-2-2V6a2 2 0 012-2h10a2 2 0 012 2v1m2 13a2 2 0 01-2-2V7m2 13a2 2 0 002-2V9.5L18.5 7H20" />
</svg>
</div>
<p class="text-sm" :class="isDark ? 'text-gray-500' : 'text-gray-400'">请选择一个服务号查看消息</p>
</div>
</div>
</div>
</div>
</template>
<script setup>
import { ref, computed, onMounted, watch } from 'vue'
import { useApi } from '~/composables/useApi'
const api = useApi()
import { storeToRefs } from 'pinia'
import { useThemeStore } from '~/stores/theme'
import { useChatAccountsStore } from '~/stores/chatAccounts'
import { useChatRealtimeStore } from '~/stores/chatRealtime'
const accounts = ref([])
const loadingAccounts = ref(false)
const searchQuery = ref('')
const selectedBizAccount = ref(null)
const themeStore = useThemeStore()
const chatAccountsStore = useChatAccountsStore()
const realtimeStore = useChatRealtimeStore()
const { isDark } = storeToRefs(themeStore)
const { selectedAccount: selectedDbAccount } = storeToRefs(chatAccountsStore)
const { enabled: realtimeEnabled, changeSeq } = storeToRefs(realtimeStore)
const messages = ref([])
const loadingMessages = ref(false)
const offset = ref(0)
const limit = 20
const hasMore = ref(true)
const messageListRef = ref(null)
let realtimeRefreshFuture = null
let realtimeRefreshQueued = false
// 默认占位图
// const defaultAvatar = 'data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSI0MCIgaGVpZ2h0PSI0MCIgdmlld0JveD0iMCAwIDQwIDQwIj48cmVjdCB3aWR0aD0iNDAiIGhlaWdodD0iNDAiIGZpbGw9IiNlNWU3ZWIiLz48L3N2Zz4='
const defaultImage = 'data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSI0MDAiIGhlaWdodD0iMTgwIj48cmVjdCB3aWR0aD0iNDAwIiBoZWlnaHQ9IjE4MCIgZmlsbD0iI2Y1ZjVmNSIvPjwvc3ZnPg=='
const getCurrentAccountParam = () => {
const account = String(selectedDbAccount.value || '').trim()
return account || undefined
}
const resetMessagesState = () => {
messages.value = []
offset.value = 0
hasMore.value = true
}
const fetchAccounts = async ({ preserveSelection = true } = {}) => {
loadingAccounts.value = true
const previousUsername = preserveSelection ? String(selectedBizAccount.value?.username || '').trim() : ''
try {
const res = await api.listBizAccounts({ account: getCurrentAccountParam() })
const nextAccounts = Array.isArray(res?.data) ? res.data : []
accounts.value = nextAccounts
if (previousUsername) {
selectedBizAccount.value = nextAccounts.find(item => item.username === previousUsername) || null
} else if (!selectedBizAccount.value?.username) {
selectedBizAccount.value = null
}
} catch (err) {
accounts.value = []
selectedBizAccount.value = null
console.error('获取服务号失败:', err)
} finally {
loadingAccounts.value = false
}
}
// 搜索过滤
const filteredAccounts = computed(() => {
if (!searchQuery.value) return accounts.value
const q = searchQuery.value.toLowerCase()
return accounts.value.filter(a =>
(a.name && a.name.toLowerCase().includes(q)) ||
(a.username && a.username.toLowerCase().includes(q))
)
})
// 点击选择服务号
const selectAccount = async (account) => {
if (selectedBizAccount.value?.username === account.username) return
selectedBizAccount.value = account
// 重置消息状态
resetMessagesState()
await loadMessages()
}
// 加载消息
const loadMessages = async () => {
if (loadingMessages.value || !hasMore.value || !selectedBizAccount.value) return
loadingMessages.value = true
try {
const username = selectedBizAccount.value.username
const params = {
account: getCurrentAccountParam(),
username,
offset: offset.value,
limit,
}
let res
if (username === 'gh_3dfda90e39d6') {
res = await api.listBizPayRecords(params)
} else {
res = await api.listBizMessages(params)
}
if (res && res.data) {
if (res.data.length < limit) {
hasMore.value = false
}
// 追加数据
messages.value.push(...res.data)
offset.value += limit
}
} catch (err) {
console.error('加载消息失败:', err)
} finally {
loadingMessages.value = false
}
}
const reloadSelectedMessages = async () => {
if (!selectedBizAccount.value) return
resetMessagesState()
await loadMessages()
}
const syncAllBizRealtime = async ({ forceReload = false } = {}) => {
const priorityUsername = String(selectedBizAccount.value?.username || '').trim()
if (!realtimeEnabled.value) {
if (forceReload) {
await reloadSelectedMessages()
}
return
}
try {
const result = await api.syncChatRealtimeAll({
account: getCurrentAccountParam(),
max_scan: 200,
priority_username: priorityUsername,
priority_max_scan: 400,
include_hidden: true,
include_official: true,
only_official: true,
backfill_limit: 0,
})
const hasDelta = Number(result?.insertedTotal || 0) > 0 || Number(result?.sessionsUpdated || 0) > 0
await fetchAccounts({ preserveSelection: true })
if (selectedBizAccount.value?.username) {
if (hasDelta || forceReload) {
await reloadSelectedMessages()
}
} else if (forceReload) {
resetMessagesState()
}
} catch (err) {
console.error('实时同步服务号失败:', err)
if (forceReload) {
await fetchAccounts({ preserveSelection: true })
await reloadSelectedMessages()
}
}
}
const queueRealtimeBizRefresh = () => {
if (!realtimeEnabled.value) return
if (realtimeRefreshFuture) {
realtimeRefreshQueued = true
return
}
realtimeRefreshFuture = syncAllBizRealtime().finally(() => {
realtimeRefreshFuture = null
if (realtimeRefreshQueued) {
realtimeRefreshQueued = false
queueRealtimeBizRefresh()
}
})
}
// 向上滚动加载逻辑
// 因为容器设置了 flex-col-reverse,所以 scrollTop 越靠近负值(或0取决于浏览器)越是到了历史消息端
// 但比较通用兼容的做法是监听 scroll,距离顶部或底部小于阈值时触发
const handleScroll = (e) => {
const target = e.target
// 针对 flex-col-reverse: 滚动到底部实际上是视觉上的最上方(历史消息)
// 当 scrollHeight - Math.abs(scrollTop) - clientHeight < 50 时加载
if (target.scrollHeight - Math.abs(target.scrollTop) - target.clientHeight < 50) {
loadMessages()
}
}
watch(selectedDbAccount, async (next, prev) => {
if (String(next || '').trim() === String(prev || '').trim()) return
selectedBizAccount.value = null
resetMessagesState()
searchQuery.value = ''
if (!String(next || '').trim()) {
accounts.value = []
return
}
await fetchAccounts({ preserveSelection: false })
if (realtimeEnabled.value) {
await syncAllBizRealtime({ forceReload: true })
}
})
watch(changeSeq, (next, prev) => {
if (!realtimeEnabled.value) return
if (next === prev) return
queueRealtimeBizRefresh()
})
watch(realtimeEnabled, async (enabled, wasEnabled) => {
if (enabled && !wasEnabled) {
await syncAllBizRealtime({ forceReload: true })
}
})
onMounted(async () => {
await chatAccountsStore.ensureLoaded()
await fetchAccounts({ preserveSelection: false })
if (realtimeEnabled.value) {
await syncAllBizRealtime({ forceReload: true })
}
})
</script>
<style scoped>
/* 隐藏滚动条但允许滚动(可选) */
.overflow-y-auto::-webkit-scrollbar {
width: 6px;
}
.overflow-y-auto::-webkit-scrollbar-track {
background: transparent;
}
.overflow-y-auto::-webkit-scrollbar-thumb {
background-color: rgba(0,0,0,0.1);
border-radius: 10px;
}
</style>
+24 -26
View File
@@ -101,6 +101,21 @@
</div>
</div>
<div
class="sidebar-rail-action w-full h-[var(--sidebar-rail-step)] flex items-center justify-center cursor-pointer group"
title="服务号"
@click="goBiz"
>
<div class="sidebar-rail-plate w-[var(--sidebar-rail-btn)] h-[var(--sidebar-rail-btn)] rounded-md flex items-center justify-center transition-colors bg-transparent">
<div class="sidebar-rail-icon w-[var(--sidebar-rail-icon)] h-[var(--sidebar-rail-icon)]" :class="{ 'sidebar-rail-icon-active': isBizRoute }">
<svg class="w-full h-full" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.8" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true">
<path d="M11 5L6 9H2v6h4l5 4V5z"></path>
<path d="M19.07 4.93a10 10 0 0 1 0 14.14M15.54 8.46a5 5 0 0 1 0 7.07"></path>
</svg>
</div>
</div>
</div>
<!-- Wrapped -->
<div
class="sidebar-rail-action w-full h-[var(--sidebar-rail-step)] flex items-center justify-center cursor-pointer group"
@@ -479,34 +494,17 @@ const isChatRoute = computed(() => route.path?.startsWith('/chat'))
const isEditsRoute = computed(() => route.path?.startsWith('/edits'))
const isSnsRoute = computed(() => route.path?.startsWith('/sns'))
const isContactsRoute = computed(() => route.path?.startsWith('/contacts'))
const isBizRoute = computed(() => route.path?.startsWith('/biz')) // 新增
const isWrappedRoute = computed(() => route.path?.startsWith('/wrapped'))
const goChat = async () => {
await navigateTo('/chat')
}
const goEdits = async () => {
await navigateTo('/edits')
}
const goSns = async () => {
await navigateTo('/sns')
}
const goContacts = async () => {
await navigateTo('/contacts')
}
const goWrapped = async () => {
await navigateTo('/wrapped')
}
const goGuide = async () => {
await navigateTo('/')
}
const goSettings = () => {
openSettingsDialog()
}
const goChat = async () => { await navigateTo('/chat') }
const goEdits = async () => { await navigateTo('/edits') }
const goSns = async () => { await navigateTo('/sns') }
const goContacts = async () => { await navigateTo('/contacts') }
const goBiz = async () => { await navigateTo('/biz') }
const goWrapped = async () => { await navigateTo('/wrapped') }
const goGuide = async () => { await navigateTo('/') }
const goSettings = () => { openSettingsDialog() }
const onWindowKeydown = (event) => {
if (event?.key !== 'Escape') return
@@ -69,7 +69,7 @@
<div v-else-if="contacts.length === 0" class="session-list-status px-3 py-2 text-sm">
暂无会话
</div>
<template v-else>
<div v-else class="pb-4">
<div v-for="contact in filteredContacts" :key="contact.id"
class="session-list-item px-3 cursor-pointer transition-colors duration-150 h-[calc(80px/var(--dpr))] flex items-center"
:class="{
@@ -118,7 +118,7 @@
</div>
</div>
</div>
</template>
</div>
</div>
</div>
+44
View File
@@ -205,6 +205,8 @@ export const useApi = () => {
if (params && params.priority_max_scan != null) query.set('priority_max_scan', String(params.priority_max_scan))
if (params && params.include_hidden != null) query.set('include_hidden', String(!!params.include_hidden))
if (params && params.include_official != null) query.set('include_official', String(!!params.include_official))
if (params && params.only_official != null) query.set('only_official', String(!!params.only_official))
if (params && params.backfill_limit != null) query.set('backfill_limit', String(params.backfill_limit))
const url = '/chat/realtime/sync_all' + (query.toString() ? `?${query.toString()}` : '')
return await request(url, { method: 'POST' })
}
@@ -561,6 +563,44 @@ export const useApi = () => {
return await request('/get_image_key')
}
// 枚举服务号信息
const listBizAccounts = async (params = {}) => {
const query = new URLSearchParams()
if (params && params.account) query.set('account', params.account)
const url = '/biz/list' + (query.toString() ? `?${query.toString()}` : '')
return await request(url)
}
// 获取普通服务号消息
const listBizMessages = async (params = {}) => {
const query = new URLSearchParams()
if (params && params.account) query.set('account', params.account)
if (params && params.username) query.set('username', params.username)
if (params && params.limit != null) query.set('limit', String(params.limit))
if (params && params.offset != null) query.set('offset', String(params.offset))
const url = '/biz/messages' + (query.toString() ? `?${query.toString()}` : '')
return await request(url)
}
// 获取微信支付记录
const listBizPayRecords = async (params = {}) => {
const query = new URLSearchParams()
if (params && params.account) query.set('account', params.account)
if (params && params.limit != null) query.set('limit', String(params.limit))
if (params && params.offset != null) query.set('offset', String(params.offset))
const url = '/biz/pay_records' + (query.toString() ? `?${query.toString()}` : '')
return await request(url)
}
const getBizProxyImageUrl = (url) => {
if (!url) return ''
if (url.startsWith('data:')) return url // 如果已经是 base64,不处理
const query = new URLSearchParams()
query.set('url', url)
const base = baseURL ? baseURL.replace(/\/$/, '') : ''
return `${base}/biz/proxy_image?${query.toString()}`
}
return {
detectWechat,
detectCurrentAccount,
@@ -616,5 +656,9 @@ export const useApi = () => {
getKeys,
getImageKey,
getWxStatus,
listBizAccounts,
listBizMessages,
listBizPayRecords,
getBizProxyImageUrl,
}
}
+16
View File
@@ -0,0 +1,16 @@
<template>
<div class="h-full min-h-0 flex overflow-hidden bg-white">
<div class="flex-1 min-w-0">
<BizMessages />
</div>
</div>
</template>
<script setup>
import BizMessages from "../components/BizMessages.vue";
useHead({
title: '服务号消息 - WeChatDataAnalysis'
})
</script>
+2
View File
@@ -36,6 +36,7 @@ from .routers.wrapped import router as _wrapped_router
from .request_logging import log_server_errors_middleware
from .sns_stage_timing import add_sns_stage_timing_headers
from .wcdb_realtime import WCDB_REALTIME, shutdown as _wcdb_shutdown
from .routers.biz import router as _biz_router
app = FastAPI(
title="微信数据库解密工具",
@@ -96,6 +97,7 @@ app.include_router(_chat_media_router)
app.include_router(_sns_router)
app.include_router(_sns_export_router)
app.include_router(_wrapped_router)
app.include_router(_biz_router)
class _SPAStaticFiles(StaticFiles):
+64 -8
View File
@@ -41,6 +41,22 @@ class ColoredFormatter(logging.Formatter):
return formatted
def _can_use_logging_stream(stream) -> bool:
try:
if stream is None or getattr(stream, "closed", False):
return False
except Exception:
return False
try:
stream.write("")
stream.flush()
except Exception:
return False
return True
class WeChatLogger:
"""微信解密工具统一日志管理器"""
@@ -64,6 +80,12 @@ class WeChatLogger:
if env_level:
log_level = env_level
console_logging_env = str(os.environ.get("WECHAT_TOOL_ENABLE_CONSOLE_LOG", "") or "").strip().lower()
console_logging_forced = console_logging_env in {"1", "true", "yes", "on"}
console_logging_disabled = console_logging_env in {"0", "false", "no", "off"}
level = getattr(logging, str(log_level or "INFO").upper(), logging.INFO)
# 创建日志目录
now = datetime.now()
from .app_paths import get_output_dir
@@ -73,10 +95,41 @@ class WeChatLogger:
# 设置日志文件名
date_str = now.strftime("%d")
self.log_file = log_dir / f"{date_str}_wechat_tool.log"
desired_log_file = log_dir / f"{date_str}_wechat_tool.log"
root_logger = logging.getLogger()
wants_console_handler = _can_use_logging_stream(sys.stdout)
if getattr(sys, "frozen", False) and not console_logging_forced:
wants_console_handler = False
if console_logging_disabled:
wants_console_handler = False
if WeChatLogger._initialized:
current_log_file = Path(getattr(self, "log_file", desired_log_file))
has_expected_file_handler = False
has_stream_handler = False
for handler in root_logger.handlers:
if isinstance(handler, logging.FileHandler):
try:
if Path(handler.baseFilename).resolve() == desired_log_file.resolve():
has_expected_file_handler = True
except Exception:
if Path(handler.baseFilename) == desired_log_file:
has_expected_file_handler = True
elif isinstance(handler, logging.StreamHandler):
has_stream_handler = True
if (
current_log_file == desired_log_file
and root_logger.level == level
and has_expected_file_handler
and (has_stream_handler or not wants_console_handler)
):
self.log_file = desired_log_file
return self.log_file
self.log_file = desired_log_file
# 清除现有的处理器
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
try:
@@ -100,18 +153,20 @@ class WeChatLogger:
# 文件处理器
file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
file_handler.setFormatter(file_formatter)
level = getattr(logging, str(log_level or "INFO").upper(), logging.INFO)
file_handler.setLevel(level)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)
console_handler.setLevel(level)
console_handler = None
if wants_console_handler:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)
console_handler.setLevel(level)
# 配置根日志器
root_logger.setLevel(level)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
if console_handler is not None:
root_logger.addHandler(console_handler)
# 只为uvicorn日志器添加文件处理器,保持其原有的控制台处理器(带颜色)
uvicorn_logger = logging.getLogger("uvicorn")
@@ -158,7 +213,8 @@ class WeChatLogger:
except Exception:
pass
fastapi_logger.addHandler(file_handler)
fastapi_logger.addHandler(console_handler)
if console_handler is not None:
fastapi_logger.addHandler(console_handler)
fastapi_logger.setLevel(level)
# 记录初始化信息
+370
View File
@@ -0,0 +1,370 @@
import hashlib
import sqlite3
import time
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Optional, Any, Dict, List
import urllib
from fastapi import APIRouter, HTTPException, Response
from pydantic import BaseModel
from ..chat_helpers import _resolve_account_dir
from ..path_fix import PathFixRoute
from ..logging_config import get_logger
try:
import zstandard as zstd
except Exception:
zstd = None
logger = get_logger(__name__)
router = APIRouter(route_class=PathFixRoute)
def decompress_zstd_content(data: bytes, source_id: str, local_id: int) -> Optional[bytes]:
"""Zstandard 解压逻辑"""
if not data or not data.startswith(b'\x28\xb5\x2f\xfd'):
return None
try:
if zstd:
dctx = zstd.ZstdDecompressor()
return dctx.decompress(data, max_output_size=10 * 1024 * 1024)
except Exception as e:
error_msg = f"❌ [解压失败] 服务号id: {source_id}, local_id: {local_id} -> {e}"
print(error_msg)
logger.error(error_msg)
return None
def extract_xml_from_db_content(content: Any, source_id: str, local_id: int) -> str:
"""提取并解压数据库内容"""
if not content:
return ""
if isinstance(content, memoryview):
content = content.tobytes()
elif isinstance(content, str):
content = content.encode('utf-8', errors='ignore')
if isinstance(content, bytes):
decompressed = decompress_zstd_content(content, source_id, local_id)
if decompressed:
return decompressed.decode('utf-8', errors='ignore')
# 若不是 zstd 压缩或解压失败,尝试直接 decode
try:
return content.decode('utf-8', errors='ignore')
except Exception:
return ""
return ""
def parse_wechat_xml_to_struct(xml_str: str, source_id: str, local_id: int) -> Optional[Dict[str, Any]]:
"""解析微信服务号 XML 到 Dict"""
if not xml_str.strip():
return None
try:
root = ET.fromstring(xml_str)
def get_tag_text(element, path, default=""):
node = element.find(path)
return node.text if node is not None and node.text else default
main_cover = get_tag_text(root, ".//appmsg/thumburl")
if not main_cover:
main_cover = get_tag_text(root, ".//topnew/cover")
result = {
"title": get_tag_text(root, ".//appmsg/title"),
"des": get_tag_text(root, ".//appmsg/des"),
"url": get_tag_text(root, ".//appmsg/url"),
"cover": main_cover,
"content_list": []
}
items = root.findall(".//mmreader/category/item")
for item in items:
item_struct = {
"title": get_tag_text(item, "title"),
"url": get_tag_text(item, "url"),
"cover": get_tag_text(item, "cover"),
"summary": get_tag_text(item, "summary")
}
if item_struct["title"]:
result["content_list"].append(item_struct)
return result
except Exception as e:
error_msg = f"❌ [解析XML失败] 服务号id: {source_id}, local_id: {local_id} -> {e}"
print(error_msg)
logger.error(error_msg)
return None
def parse_pay_xml(xml_str: str, local_id: int) -> Optional[Dict[str, Any]]:
"""解析微信支付 XML"""
if not xml_str.strip():
return None
try:
root = ET.fromstring(xml_str)
def get_text(path):
node = root.find(path)
return node.text if node is not None else ""
record = {
"title": get_text(".//appmsg/title"),
"description": get_text(".//appmsg/des"),
"merchant_name": get_text(".//template_header/display_name"),
"merchant_icon": get_text(".//template_header/icon_url"),
"timestamp": int(get_text(".//pub_time") or 0),
"formatted_time": ""
}
return record
except Exception as e:
error_msg = f"❌ [解析微信支付XML失败] 支付id: gh_3dfda90e39d6, local_id: {local_id} -> {e}"
print(error_msg)
logger.error(error_msg)
return None
@router.get("/api/biz/proxy_image", summary="代理请求微信服务号图片")
def proxy_biz_image(url: str):
if not url:
return Response(status_code=400)
try:
req = urllib.request.Request(url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
})
with urllib.request.urlopen(req, timeout=10) as response:
content = response.read()
content_type = response.headers.get('Content-Type', 'image/jpeg')
return Response(content=content, media_type=content_type)
except Exception as e:
logger.error(f"[biz] 代理图片失败: {url} -> {e}")
return Response(status_code=500)
# 接口 1:获取全部的服务号/公众号的信息
@router.get("/api/biz/list", summary="获取全部服务号/公众号列表")
def get_biz_account_list(account: Optional[str] = None):
account_dir = _resolve_account_dir(account)
biz_ids = set()
biz_latest_time = {}
# 1. 遍历 biz_message_*.db
for db_file in account_dir.glob("biz_message*.db"):
try:
conn = sqlite3.connect(str(db_file))
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(Name2Id)")
cols = [row[1].lower() for row in cursor.fetchall()]
user_col = "username" if "username" in cols else "user_name" if "user_name" in cols else ""
if user_col:
rows = cursor.execute(f"SELECT {user_col} FROM Name2Id").fetchall()
for r in rows:
if r[0]:
uname = r[0]
biz_ids.add(uname)
# 顺便查询该号的最后一条消息时间
md5_id = hashlib.md5(uname.encode('utf-8')).hexdigest().lower()
table_name = f"Msg_{md5_id}"
try:
time_res = conn.execute(f"SELECT MAX(create_time) FROM {table_name}").fetchone()
if time_res and time_res[0]:
current_max = biz_latest_time.get(uname, 0)
biz_latest_time[uname] = max(current_max, time_res[0])
except Exception:
pass
conn.close()
except Exception as e:
logger.warning(f"读取 Name2Id 失败 {db_file}: {e}")
contact_db_path = account_dir / "contact.db"
contact_info = {}
if contact_db_path.exists() and biz_ids:
try:
conn = sqlite3.connect(str(contact_db_path))
cursor = conn.cursor()
placeholders = ",".join(["?"] * len(biz_ids))
# 先查 contact 表
query_contact = f"SELECT username, remark, nick_name, alias, big_head_url FROM contact WHERE username IN ({placeholders})"
rows_contact = cursor.execute(query_contact, list(biz_ids)).fetchall()
for r in rows_contact:
uname = r[0]
name = r[1] or r[2] or r[3] or uname
contact_info[uname] = {
"username": uname,
"name": name,
"avatar": r[4],
"type": 3 # 默认给个 3(未知)
}
# 再查 biz_info 表获取类型
try:
query_biz = f"SELECT username, type FROM biz_info WHERE username IN ({placeholders})"
rows_biz = cursor.execute(query_biz, list(biz_ids)).fetchall()
for r in rows_biz:
uname = r[0]
biz_type = r[1]
# 如果查到了且是 0, 1, 2,就更新进去,否则保留 3
if uname in contact_info:
if biz_type in (0, 1, 2):
contact_info[uname]["type"] = biz_type
else:
contact_info[uname]["type"] = 3
except Exception as e:
logger.warning(f"读取 biz_info 失败: {e}")
conn.close()
except Exception as e:
logger.warning(f"读取 contact.db 失败: {e}")
# 3. 组装结果(不在 contact_info 里的直接丢弃)
result = []
for uid in biz_ids:
if uid in contact_info:
info = contact_info[uid]
info["last_time"] = biz_latest_time.get(uid, 0)
if info["last_time"]:
# 格式化日期给前端展示用
info["formatted_last_time"] = time.strftime("%Y-%m-%d", time.localtime(info["last_time"]))
else:
info["formatted_last_time"] = ""
result.append(info)
# 4. 按最后一条消息的时间降序排列
result.sort(key=lambda x: x.get("last_time", 0), reverse=True)
return {"status": "success", "total": len(result), "data": result}
# 接口 2:获取普通服务号/公众号的 json 消息 (已修复表名比对 bug)
@router.get("/api/biz/messages", summary="获取指定服务号的消息")
def get_biz_messages(username: str, account: Optional[str] = None, limit: int = 50, offset: int = 0):
if username == "gh_3dfda90e39d6":
raise HTTPException(status_code=400, detail="微信支付记录请请求 /api/biz/pay_records 接口")
account_dir = _resolve_account_dir(account)
md5_id = hashlib.md5(username.encode('utf-8')).hexdigest().lower()
table_name = f"Msg_{md5_id}"
target_db = None
for db_file in account_dir.glob("biz_message*.db"):
conn = sqlite3.connect(str(db_file))
try:
# 必须用 table_name.lower(),否则永远匹配不上
res = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=?",
(table_name.lower(),)).fetchone()
if res:
target_db = db_file
break
except Exception:
pass
finally:
conn.close()
if not target_db:
return {"status": "success", "data": [], "message": f"未找到 {username} 的消息历史"}
# ... (后续数据库查询逻辑保持不变) ...
messages = []
try:
conn = sqlite3.connect(str(target_db))
cursor = conn.cursor()
query = f"""
SELECT local_id, create_time, message_content
FROM [{table_name}]
WHERE local_type != 1
ORDER BY create_time DESC
LIMIT ? OFFSET ?
"""
rows = cursor.execute(query, (limit, offset)).fetchall()
for local_id, c_time, content in rows:
raw_xml = extract_xml_from_db_content(content, username, local_id)
if not raw_xml:
continue
struct_data = parse_wechat_xml_to_struct(raw_xml, username, local_id)
if struct_data:
struct_data["local_id"] = local_id
struct_data["create_time"] = c_time
messages.append(struct_data)
conn.close()
except Exception as e:
logger.error(f"[biz] 数据库查询出错: {e}")
return {"status": "error", "message": str(e)}
return {"status": "success", "data": messages}
# 接口 3:返回微信支付的 json 消息 (已修复表名比对 bug)
@router.get("/api/biz/pay_records", summary="获取微信支付记录")
def get_wechat_pay_records(account: Optional[str] = None, limit: int = 50, offset: int = 0):
username = "gh_3dfda90e39d6"
account_dir = _resolve_account_dir(account)
md5_id = hashlib.md5(username.encode('utf-8')).hexdigest().lower()
table_name = f"Msg_{md5_id}"
target_db = None
for db_file in account_dir.glob("biz_message*.db"):
conn = sqlite3.connect(str(db_file))
try:
# 必须用 table_name.lower()
res = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=?",
(table_name.lower(),)).fetchone()
if res:
target_db = db_file
break
except Exception:
pass
finally:
conn.close()
if not target_db:
return {"status": "success", "data": [], "message": "未找到微信支付的消息历史"}
messages = []
try:
conn = sqlite3.connect(str(target_db))
cursor = conn.cursor()
query = f"""
SELECT local_id, create_time, message_content
FROM [{table_name}]
WHERE local_type = 21474836529 OR local_type != 1
ORDER BY create_time DESC
LIMIT ? OFFSET ?
"""
rows = cursor.execute(query, (limit, offset)).fetchall()
for local_id, c_time, content in rows:
raw_xml = extract_xml_from_db_content(content, username, local_id)
if not raw_xml:
continue
parsed_data = parse_pay_xml(raw_xml, local_id)
if parsed_data:
parsed_data["local_id"] = local_id
parsed_data["create_time"] = c_time
if not parsed_data["timestamp"]:
parsed_data["timestamp"] = c_time
parsed_data["formatted_time"] = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(parsed_data["timestamp"])
)
messages.append(parsed_data)
conn.close()
except Exception as e:
logger.error(f"[biz] 查询微信支付数据库出错: {e}")
return {"status": "error", "message": str(e)}
return {"status": "success", "data": messages}
+326 -222
View File
@@ -1391,6 +1391,299 @@ def _load_contact_top_flags(contact_db_path: Path, usernames: list[str]) -> dict
conn.close()
def _coerce_realtime_blobish_value(value: Any) -> Any:
if value is None:
return None
if isinstance(value, memoryview):
value = value.tobytes()
if isinstance(value, bytearray):
return bytes(value)
if isinstance(value, bytes):
try:
s = value.decode("ascii").strip()
except Exception:
return value
if not s:
return value
b = _hex_to_bytes(s)
if b is not None:
return b
if (len(s) % 2 == 0) and (_HEX_RE.fullmatch(s) is not None):
try:
return bytes.fromhex(s)
except Exception:
return value
return value
if isinstance(value, str):
s = value.strip()
if not s:
return value
b = _hex_to_bytes(s)
if b is not None:
return b
if (len(s) % 2 == 0) and (_HEX_RE.fullmatch(s) is not None):
try:
return bytes.fromhex(s)
except Exception:
return value
return value
return value
def _normalize_realtime_message_item(item: dict[str, Any]) -> dict[str, Any]:
def _pick(*keys: str) -> Any:
return _pick_case_insensitive_value(item, *keys)
message_content = _coerce_realtime_blobish_value(
_pick("message_content", "messageContent", "MessageContent")
)
if message_content is None:
message_content = ""
return {
"local_id": int(_pick("local_id", "localId") or 0),
"server_id": int(_pick("server_id", "serverId", "MsgSvrID") or 0),
"local_type": int(_pick("local_type", "localType", "Type", "type") or 0),
"sort_seq": int(_pick("sort_seq", "sortSeq", "SortSeq") or 0),
"real_sender_id": int(_pick("real_sender_id", "realSenderId") or 0),
"create_time": int(_pick("create_time", "createTime", "CreateTime") or 0),
"message_content": message_content,
"compress_content": _coerce_realtime_blobish_value(
_pick("compress_content", "compressContent", "CompressContent")
),
"packed_info_data": _coerce_realtime_blobish_value(
_pick("packed_info_data", "packedInfoData", "PackedInfoData")
),
"sender_username": str(
_pick("sender_username", "senderUsername", "sender", "SenderUsername") or ""
).strip(),
}
def _collect_realtime_rows_for_session(
*,
trace_id: Optional[str],
account_name: str,
rt_conn: Any,
username: str,
msg_db_path_real: Path,
table_name: str,
max_local_id: int,
max_scan: int,
backfill_limit: int,
) -> dict[str, Any]:
label = f"[{trace_id}]" if trace_id else "[realtime]"
log_fn = logger.info if trace_id else logger.debug
uname = str(username or "").strip()
use_biz_exec_query = uname.startswith("gh_") and ("biz_message" in str(msg_db_path_real.name).lower())
if use_biz_exec_query:
try:
quoted_table = _quote_ident(table_name)
select_cols = (
"local_id",
"server_id",
"local_type",
"sort_seq",
"real_sender_id",
"create_time",
"message_content",
"compress_content",
"packed_info_data",
)
select_sql = ", ".join([_quote_ident(col) for col in select_cols])
if int(max_local_id) > 0:
sql_new = (
f"SELECT {select_sql} FROM {quoted_table} "
f"WHERE local_id > {int(max_local_id)} "
f"ORDER BY local_id ASC LIMIT {int(max_scan)}"
)
else:
sql_new = f"SELECT {select_sql} FROM {quoted_table} ORDER BY local_id DESC LIMIT {int(max_scan)}"
log_fn(
"%s wcdb_exec_query biz account=%s username=%s mode=new_rows max_local_id=%s limit=%s",
label,
account_name,
uname,
int(max_local_id),
int(max_scan),
)
wcdb_t0 = time.perf_counter()
with rt_conn.lock:
raw_new_rows = _wcdb_exec_query(rt_conn.handle, kind="message", path=str(msg_db_path_real), sql=sql_new)
wcdb_ms = (time.perf_counter() - wcdb_t0) * 1000.0
logger.info(
"%s wcdb_exec_query biz done account=%s username=%s mode=new_rows rows=%s ms=%.1f",
label,
account_name,
uname,
len(raw_new_rows or []),
wcdb_ms,
)
if wcdb_ms > 2000:
logger.warning(
"%s wcdb_exec_query biz slow account=%s username=%s mode=new_rows ms=%.1f",
label,
account_name,
uname,
wcdb_ms,
)
normalized_new_rows: list[dict[str, Any]] = []
for item in raw_new_rows or []:
if not isinstance(item, dict):
continue
norm = _normalize_realtime_message_item(item)
if int(norm.get("local_id") or 0) <= 0:
continue
normalized_new_rows.append(norm)
if int(max_local_id) > 0:
new_rows = list(reversed(normalized_new_rows))
else:
new_rows = normalized_new_rows
backfill_rows: list[dict[str, Any]] = []
scanned = len(raw_new_rows or [])
if int(backfill_limit) > 0 and int(max_local_id) > 0:
sql_backfill = (
f"SELECT {select_sql} FROM {quoted_table} "
f"WHERE local_id <= {int(max_local_id)} "
f"ORDER BY local_id DESC LIMIT {int(backfill_limit)}"
)
log_fn(
"%s wcdb_exec_query biz account=%s username=%s mode=backfill limit=%s",
label,
account_name,
uname,
int(backfill_limit),
)
backfill_t0 = time.perf_counter()
with rt_conn.lock:
raw_backfill_rows = _wcdb_exec_query(
rt_conn.handle,
kind="message",
path=str(msg_db_path_real),
sql=sql_backfill,
)
backfill_ms = (time.perf_counter() - backfill_t0) * 1000.0
logger.info(
"%s wcdb_exec_query biz done account=%s username=%s mode=backfill rows=%s ms=%.1f",
label,
account_name,
uname,
len(raw_backfill_rows or []),
backfill_ms,
)
if backfill_ms > 2000:
logger.warning(
"%s wcdb_exec_query biz slow account=%s username=%s mode=backfill ms=%.1f",
label,
account_name,
uname,
backfill_ms,
)
scanned += len(raw_backfill_rows or [])
for item in raw_backfill_rows or []:
if not isinstance(item, dict):
continue
norm = _normalize_realtime_message_item(item)
if int(norm.get("local_id") or 0) <= 0:
continue
backfill_rows.append(norm)
return {
"fetchMode": "biz_exec_query",
"scanned": int(scanned),
"new_rows": new_rows,
"backfill_rows": backfill_rows,
}
except Exception as e:
logger.warning(
"%s wcdb_exec_query biz failed account=%s username=%s err=%s fallback=wcdb_get_messages",
label,
account_name,
uname,
str(e),
)
batch_size = 200
scanned = 0
offset = 0
new_rows: list[dict[str, Any]] = []
backfill_rows: list[dict[str, Any]] = []
reached_existing = False
stop = False
while scanned < int(max_scan):
take = min(batch_size, int(max_scan) - scanned)
log_fn(
"%s wcdb_get_messages account=%s username=%s take=%s offset=%s",
label,
account_name,
uname,
int(take),
int(offset),
)
wcdb_t0 = time.perf_counter()
with rt_conn.lock:
raw_rows = _wcdb_get_messages(rt_conn.handle, uname, limit=take, offset=offset)
wcdb_ms = (time.perf_counter() - wcdb_t0) * 1000.0
log_fn(
"%s wcdb_get_messages done account=%s username=%s rows=%s ms=%.1f",
label,
account_name,
uname,
len(raw_rows or []),
wcdb_ms,
)
if wcdb_ms > 2000:
logger.warning(
"%s wcdb_get_messages slow account=%s username=%s ms=%.1f",
label,
account_name,
uname,
wcdb_ms,
)
if not raw_rows:
break
scanned += len(raw_rows)
offset += len(raw_rows)
for item in raw_rows:
if not isinstance(item, dict):
continue
norm = _normalize_realtime_message_item(item)
lid = int(norm.get("local_id") or 0)
if lid <= 0:
continue
if (not reached_existing) and lid > int(max_local_id):
new_rows.append(norm)
continue
reached_existing = True
if int(backfill_limit) <= 0:
stop = True
break
backfill_rows.append(norm)
if len(backfill_rows) >= int(backfill_limit):
stop = True
break
if stop or len(raw_rows) < take:
break
return {
"fetchMode": "wcdb_get_messages",
"scanned": int(scanned),
"new_rows": new_rows,
"backfill_rows": backfill_rows,
}
@router.post("/api/chat/realtime/sync", summary="实时消息同步到解密库(按会话增量)")
def sync_chat_realtime_messages(
request: Request,
@@ -1511,118 +1804,20 @@ def sync_chat_realtime_messages(
placeholders = ",".join(["?"] * len(insert_cols))
insert_sql = f"INSERT OR IGNORE INTO {quoted_table} ({','.join(insert_cols)}) VALUES ({placeholders})"
def pick(item: dict[str, Any], *keys: str) -> Any:
for k in keys:
if k in item and item[k] is not None:
return item[k]
lk = k.lower()
for kk in item.keys():
if str(kk).lower() == lk and item[kk] is not None:
return item[kk]
return None
def normalize_blob(value: Any) -> Optional[bytes]:
if value is None:
return None
if isinstance(value, memoryview):
return value.tobytes()
if isinstance(value, (bytes, bytearray)):
return bytes(value)
if isinstance(value, str):
s = value.strip()
if s.lower().startswith("0x"):
s = s[2:]
if s and re.fullmatch(r"[0-9a-fA-F]+", s) and (len(s) % 2 == 0):
try:
return bytes.fromhex(s)
except Exception:
return None
return s.encode("utf-8", errors="ignore")
return None
def normalize(item: dict[str, Any]) -> dict[str, Any]:
return {
"local_id": int(pick(item, "local_id", "localId") or 0),
"server_id": int(pick(item, "server_id", "serverId", "MsgSvrID") or 0),
"local_type": int(pick(item, "local_type", "localType", "Type", "type") or 0),
"sort_seq": int(pick(item, "sort_seq", "sortSeq", "SortSeq") or 0),
"real_sender_id": int(pick(item, "real_sender_id", "realSenderId") or 0),
"create_time": int(pick(item, "create_time", "createTime", "CreateTime") or 0),
"message_content": pick(item, "message_content", "messageContent", "MessageContent") or "",
"compress_content": pick(item, "compress_content", "compressContent", "CompressContent"),
"packed_info_data": normalize_blob(pick(item, "packed_info_data", "packedInfoData")),
"sender_username": str(
pick(item, "sender_username", "senderUsername", "sender", "SenderUsername") or ""
).strip(),
}
batch_size = 200
scanned = 0
offset = 0
new_rows: list[dict[str, Any]] = []
backfill_rows: list[dict[str, Any]] = []
reached_existing = False
stop = False
while scanned < int(max_scan):
take = min(batch_size, int(max_scan) - scanned)
logger.info(
"[%s] wcdb_get_messages account=%s username=%s take=%s offset=%s",
trace_id,
account_dir.name,
username,
int(take),
int(offset),
)
wcdb_t0 = time.perf_counter()
with rt_conn.lock:
raw_rows = _wcdb_get_messages(rt_conn.handle, username, limit=take, offset=offset)
wcdb_ms = (time.perf_counter() - wcdb_t0) * 1000.0
logger.info(
"[%s] wcdb_get_messages done account=%s username=%s rows=%s ms=%.1f",
trace_id,
account_dir.name,
username,
len(raw_rows or []),
wcdb_ms,
)
if wcdb_ms > 2000:
logger.warning(
"[%s] wcdb_get_messages slow account=%s username=%s ms=%.1f",
trace_id,
account_dir.name,
username,
wcdb_ms,
)
if not raw_rows:
break
scanned += len(raw_rows)
offset += len(raw_rows)
for item in raw_rows:
if not isinstance(item, dict):
continue
norm = normalize(item)
lid = int(norm.get("local_id") or 0)
if lid <= 0:
continue
if (not reached_existing) and lid > max_local_id:
new_rows.append(norm)
continue
reached_existing = True
if int(backfill_limit) <= 0:
stop = True
break
backfill_rows.append(norm)
if len(backfill_rows) >= int(backfill_limit):
stop = True
break
if stop or len(raw_rows) < take:
break
fetch_result = _collect_realtime_rows_for_session(
trace_id=trace_id,
account_name=account_dir.name,
rt_conn=rt_conn,
username=username,
msg_db_path_real=msg_db_path_real,
table_name=table_name,
max_local_id=max_local_id,
max_scan=int(max_scan),
backfill_limit=int(backfill_limit),
)
scanned = int(fetch_result.get("scanned") or 0)
new_rows = list(fetch_result.get("new_rows") or [])
backfill_rows = list(fetch_result.get("backfill_rows") or [])
inserted = 0
backfilled = 0
@@ -1880,115 +2075,20 @@ def _sync_chat_realtime_messages_for_table(
placeholders = ",".join(["?"] * len(insert_cols))
insert_sql = f"INSERT OR IGNORE INTO {quoted_table} ({','.join(insert_cols)}) VALUES ({placeholders})"
def pick(item: dict[str, Any], *keys: str) -> Any:
for k in keys:
if k in item and item[k] is not None:
return item[k]
lk = k.lower()
for kk in item.keys():
if str(kk).lower() == lk and item[kk] is not None:
return item[kk]
return None
def normalize_blob(value: Any) -> Optional[bytes]:
if value is None:
return None
if isinstance(value, memoryview):
return value.tobytes()
if isinstance(value, (bytes, bytearray)):
return bytes(value)
if isinstance(value, str):
s = value.strip()
if s.lower().startswith("0x"):
s = s[2:]
if s and re.fullmatch(r"[0-9a-fA-F]+", s) and (len(s) % 2 == 0):
try:
return bytes.fromhex(s)
except Exception:
return None
return s.encode("utf-8", errors="ignore")
return None
def normalize(item: dict[str, Any]) -> dict[str, Any]:
return {
"local_id": int(pick(item, "local_id", "localId") or 0),
"server_id": int(pick(item, "server_id", "serverId", "MsgSvrID") or 0),
"local_type": int(pick(item, "local_type", "localType", "Type", "type") or 0),
"sort_seq": int(pick(item, "sort_seq", "sortSeq", "SortSeq") or 0),
"real_sender_id": int(pick(item, "real_sender_id", "realSenderId") or 0),
"create_time": int(pick(item, "create_time", "createTime", "CreateTime") or 0),
"message_content": pick(item, "message_content", "messageContent", "MessageContent") or "",
"compress_content": pick(item, "compress_content", "compressContent", "CompressContent"),
"packed_info_data": normalize_blob(pick(item, "packed_info_data", "packedInfoData")),
"sender_username": str(
pick(item, "sender_username", "senderUsername", "sender", "SenderUsername") or ""
).strip(),
}
batch_size = 200
scanned = 0
offset = 0
new_rows: list[dict[str, Any]] = []
backfill_rows: list[dict[str, Any]] = []
reached_existing = False
stop = False
while scanned < int(max_scan):
take = min(batch_size, int(max_scan) - scanned)
logger.debug(
"[realtime] wcdb_get_messages account=%s username=%s take=%s offset=%s",
account_dir.name,
username,
int(take),
int(offset),
)
wcdb_t0 = time.perf_counter()
with rt_conn.lock:
raw_rows = _wcdb_get_messages(rt_conn.handle, username, limit=take, offset=offset)
wcdb_ms = (time.perf_counter() - wcdb_t0) * 1000.0
logger.debug(
"[realtime] wcdb_get_messages done account=%s username=%s rows=%s ms=%.1f",
account_dir.name,
username,
len(raw_rows or []),
wcdb_ms,
)
if wcdb_ms > 2000:
logger.warning(
"[realtime] wcdb_get_messages slow account=%s username=%s ms=%.1f",
account_dir.name,
username,
wcdb_ms,
)
if not raw_rows:
break
scanned += len(raw_rows)
offset += len(raw_rows)
for item in raw_rows:
if not isinstance(item, dict):
continue
norm = normalize(item)
lid = int(norm.get("local_id") or 0)
if lid <= 0:
continue
if (not reached_existing) and lid > max_local_id:
new_rows.append(norm)
continue
reached_existing = True
if int(backfill_limit) <= 0:
stop = True
break
backfill_rows.append(norm)
if len(backfill_rows) >= int(backfill_limit):
stop = True
break
if stop or len(raw_rows) < take:
break
fetch_result = _collect_realtime_rows_for_session(
trace_id=None,
account_name=account_dir.name,
rt_conn=rt_conn,
username=username,
msg_db_path_real=msg_db_path_real,
table_name=table_name,
max_local_id=max_local_id,
max_scan=int(max_scan),
backfill_limit=int(backfill_limit),
)
scanned = int(fetch_result.get("scanned") or 0)
new_rows = list(fetch_result.get("new_rows") or [])
backfill_rows = list(fetch_result.get("backfill_rows") or [])
inserted = 0
backfilled = 0
@@ -2163,6 +2263,7 @@ def sync_chat_realtime_messages_all(
priority_max_scan: int = 600,
include_hidden: bool = True,
include_official: bool = True,
only_official: bool = False,
backfill_limit: int = 200,
):
"""
@@ -2173,13 +2274,14 @@ def sync_chat_realtime_messages_all(
account_dir = _resolve_account_dir(account)
trace_id = f"rt-syncall-{int(time.time() * 1000)}-{threading.get_ident()}"
logger.info(
"[%s] realtime sync_all start account=%s max_scan=%s priority=%s include_hidden=%s include_official=%s",
"[%s] realtime sync_all start account=%s max_scan=%s priority=%s include_hidden=%s include_official=%s only_official=%s",
trace_id,
account_dir.name,
int(max_scan),
str(priority_username or "").strip(),
bool(include_hidden),
bool(include_official),
bool(only_official),
)
if max_scan < 20:
@@ -2241,6 +2343,8 @@ def sync_chat_realtime_messages_all(
hidden_val = 0
if not include_hidden and hidden_val == 1:
continue
if only_official and not uname.startswith("gh_"):
continue
if not _should_keep_session(uname, include_official=include_official):
continue
+34
View File
@@ -26,6 +26,32 @@ _DEFAULT_WCDB_API_DLL = _NATIVE_DIR / "wcdb_api.dll"
_WCDB_API_DLL_SELECTED: Optional[Path] = None
def _iter_runtime_wcdb_api_dll_paths() -> tuple[Path, ...]:
candidates: list[Path] = []
seen: set[str] = set()
def add_anchor(anchor: str | Path | None) -> None:
if not anchor:
return
try:
base = Path(anchor).resolve()
except Exception:
base = Path(anchor)
candidate = base / "native" / "wcdb_api.dll"
key = str(candidate).replace("/", "\\").rstrip("\\").lower()
if key in seen:
return
seen.add(key)
candidates.append(candidate)
add_anchor(os.environ.get("WECHAT_TOOL_DATA_DIR", "").strip())
add_anchor(Path.cwd())
if getattr(sys, "frozen", False):
add_anchor(Path(sys.executable).resolve().parent)
return tuple(candidates)
def _is_project_wcdb_api_dll_path(path: Path) -> bool:
try:
resolved = path.resolve(strict=False)
@@ -40,6 +66,14 @@ def _is_project_wcdb_api_dll_path(path: Path) -> bool:
if resolved == default_resolved:
return True
for candidate in _iter_runtime_wcdb_api_dll_paths():
try:
if resolved == candidate.resolve(strict=False):
return True
except Exception:
if resolved == candidate:
return True
parts = tuple(str(part).lower() for part in resolved.parts)
allowed_suffixes = (
("backend", "native", "wcdb_api.dll"),