mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-02-02 05:50:50 +08:00
improvement(chat): 优化会话时间显示并增强文件消息资源定位
- 会话列表时间按 今天/昨天/星期/日期/跨年 格式化 - 文件类消息在 msg/file 目录下按时间与多策略更稳健定位
This commit is contained in:
@@ -98,14 +98,39 @@ def _should_keep_session(username: str, include_official: bool) -> bool:
|
||||
|
||||
|
||||
def _format_session_time(ts: Optional[int]) -> str:
|
||||
"""智能时间格式化:今天显示时间,昨天显示"昨天 HH:MM",本周显示"星期X HH:MM",本年显示"M月D日 HH:MM",跨年显示"YYYY年M月D日 HH:MM"""
|
||||
if not ts:
|
||||
return ""
|
||||
try:
|
||||
dt = datetime.fromtimestamp(int(ts))
|
||||
now = datetime.now()
|
||||
if dt.date() == now.date():
|
||||
return dt.strftime("%H:%M")
|
||||
return dt.strftime("%m/%d")
|
||||
time_str = dt.strftime("%H:%M")
|
||||
|
||||
# 计算日期差异(基于日历日期)
|
||||
today_start = datetime(now.year, now.month, now.day)
|
||||
target_start = datetime(dt.year, dt.month, dt.day)
|
||||
day_diff = (today_start - target_start).days
|
||||
|
||||
# 今天
|
||||
if day_diff == 0:
|
||||
return time_str
|
||||
|
||||
# 昨天
|
||||
if day_diff == 1:
|
||||
return f"昨天 {time_str}"
|
||||
|
||||
# 本周内(2-6天前,显示星期)
|
||||
if 2 <= day_diff <= 6:
|
||||
week_days = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
|
||||
# Python weekday(): Monday=0, Sunday=6
|
||||
return f"{week_days[dt.weekday()]} {time_str}"
|
||||
|
||||
# 本年内
|
||||
if dt.year == now.year:
|
||||
return f"{dt.month}月{dt.day}日 {time_str}"
|
||||
|
||||
# 跨年
|
||||
return f"{dt.year}年{dt.month}月{dt.day}日 {time_str}"
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import ctypes
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import mimetypes
|
||||
@@ -377,10 +378,183 @@ def _resolve_media_path_from_hardlink(
|
||||
if not row:
|
||||
continue
|
||||
|
||||
dir1 = str(row["dir1"] or "").strip()
|
||||
dir2 = str(row["dir2"] or "").strip()
|
||||
file_name = str(row["file_name"] or "").strip()
|
||||
if not dir1 or not dir2 or not file_name:
|
||||
if not file_name:
|
||||
continue
|
||||
|
||||
if kind_key == "file":
|
||||
try:
|
||||
full_row = conn.execute(
|
||||
f"SELECT file_name, file_size, modify_time FROM {quoted} WHERE md5 = ? ORDER BY modify_time DESC LIMIT 1",
|
||||
(md5,),
|
||||
).fetchone()
|
||||
except Exception:
|
||||
full_row = None
|
||||
|
||||
file_size: Optional[int] = None
|
||||
modify_time: Optional[int] = None
|
||||
if full_row is not None:
|
||||
try:
|
||||
if full_row["file_size"] is not None:
|
||||
file_size = int(full_row["file_size"])
|
||||
except Exception:
|
||||
file_size = None
|
||||
try:
|
||||
if full_row["modify_time"] is not None:
|
||||
modify_time = int(full_row["modify_time"])
|
||||
except Exception:
|
||||
modify_time = None
|
||||
|
||||
roots: list[Path] = []
|
||||
for r in [wxid_dir] + (extra_roots or []):
|
||||
if not r:
|
||||
continue
|
||||
try:
|
||||
rr = r.resolve()
|
||||
except Exception:
|
||||
rr = r
|
||||
if rr not in roots:
|
||||
roots.append(rr)
|
||||
|
||||
file_base_dirs: list[Path] = []
|
||||
for root in roots:
|
||||
candidates = [
|
||||
root / "msg" / "file",
|
||||
root / "file" if root.name.lower() == "msg" else None,
|
||||
root if root.name.lower() == "file" else None,
|
||||
]
|
||||
for c in candidates:
|
||||
if not c:
|
||||
continue
|
||||
try:
|
||||
if c.exists() and c.is_dir() and c not in file_base_dirs:
|
||||
file_base_dirs.append(c)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not file_base_dirs:
|
||||
return None
|
||||
|
||||
guessed_month: Optional[str] = None
|
||||
if modify_time:
|
||||
try:
|
||||
dt = datetime.datetime.fromtimestamp(int(modify_time))
|
||||
guessed_month = f"{dt.year:04d}-{dt.month:02d}"
|
||||
except Exception:
|
||||
guessed_month = None
|
||||
|
||||
file_stem = Path(file_name).stem
|
||||
|
||||
def _iter_month_dirs(base: Path) -> list[Path]:
|
||||
out: list[Path] = []
|
||||
try:
|
||||
for child in base.iterdir():
|
||||
try:
|
||||
if not child.is_dir():
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
name = str(child.name)
|
||||
if re.fullmatch(r"\d{4}-\d{2}", name):
|
||||
out.append(child)
|
||||
except Exception:
|
||||
return []
|
||||
return sorted(out, key=lambda p: str(p.name))
|
||||
|
||||
def _pick_best_hit(hits: list[Path]) -> Optional[Path]:
|
||||
if not hits:
|
||||
return None
|
||||
if file_size is not None and file_size >= 0:
|
||||
for h in hits:
|
||||
try:
|
||||
if h.stat().st_size == file_size:
|
||||
return h
|
||||
except Exception:
|
||||
continue
|
||||
return hits[0]
|
||||
|
||||
for base in file_base_dirs:
|
||||
month_dirs = _iter_month_dirs(base)
|
||||
month_names: list[str] = []
|
||||
if guessed_month:
|
||||
month_names.append(guessed_month)
|
||||
for d in month_dirs:
|
||||
n = str(d.name)
|
||||
if n not in month_names:
|
||||
month_names.append(n)
|
||||
|
||||
for month_name in month_names:
|
||||
month_dir = base / month_name
|
||||
try:
|
||||
if not (month_dir.exists() and month_dir.is_dir()):
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
direct = month_dir / file_name
|
||||
try:
|
||||
if direct.exists() and direct.is_file():
|
||||
return direct
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
in_stem_dir = month_dir / file_stem / file_name
|
||||
try:
|
||||
if in_stem_dir.exists() and in_stem_dir.is_file():
|
||||
return in_stem_dir
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
hits: list[Path] = []
|
||||
try:
|
||||
for p in month_dir.rglob(file_name):
|
||||
try:
|
||||
if p.is_file():
|
||||
hits.append(p)
|
||||
if len(hits) >= 20:
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
except Exception:
|
||||
hits = []
|
||||
|
||||
best = _pick_best_hit(hits)
|
||||
if best:
|
||||
return best
|
||||
|
||||
# Final fallback: search across all months (covers rare nesting patterns)
|
||||
hits_all: list[Path] = []
|
||||
try:
|
||||
for p in base.rglob(file_name):
|
||||
try:
|
||||
if p.is_file():
|
||||
hits_all.append(p)
|
||||
if len(hits_all) >= 50:
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
except Exception:
|
||||
hits_all = []
|
||||
|
||||
best_all = _pick_best_hit(hits_all)
|
||||
if best_all:
|
||||
return best_all
|
||||
|
||||
if guessed_month:
|
||||
fallback_dir = base / guessed_month
|
||||
try:
|
||||
if fallback_dir.exists() and fallback_dir.is_dir():
|
||||
return fallback_dir
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return base
|
||||
|
||||
return None
|
||||
|
||||
dir1 = str(row["dir1"] if row["dir1"] is not None else "").strip()
|
||||
dir2 = str(row["dir2"] if row["dir2"] is not None else "").strip()
|
||||
if not dir1 or not dir2:
|
||||
continue
|
||||
|
||||
dir_name = dir2
|
||||
|
||||
Reference in New Issue
Block a user