fix: 修复新版本网页不适配的问题
This commit is contained in:
+334
-16
@@ -80,10 +80,18 @@ class AnswerRunner:
|
||||
def _go_login(self, page) -> None:
|
||||
relogin = page.get_by_role("button", name="重新登录")
|
||||
if relogin.count() > 0:
|
||||
print("Login expired dialog detected. Clicking relogin.")
|
||||
relogin.first.click()
|
||||
time.sleep(1)
|
||||
else:
|
||||
relogin_text = page.get_by_text("重新登录", exact=True)
|
||||
if relogin_text.count() > 0:
|
||||
print("Login expired dialog detected. Clicking relogin.")
|
||||
relogin_text.first.click()
|
||||
time.sleep(1)
|
||||
|
||||
if "/login" not in page.url:
|
||||
print("Opening login page.")
|
||||
page.goto(self.settings.login_url)
|
||||
|
||||
page.wait_for_load_state("domcontentloaded")
|
||||
@@ -162,10 +170,20 @@ class AnswerRunner:
|
||||
page.wait_for_event("close", timeout=0)
|
||||
|
||||
def _select_question_bank(self, page, keyword: str) -> None:
|
||||
if "/question/bank" in page.url:
|
||||
self._recover_from_question_bank_picker(page)
|
||||
|
||||
self._wait_for_question_bank_tabs(page)
|
||||
tabs = page.get_by_role("tab")
|
||||
tabs = page.locator("[role='tab']")
|
||||
count = tabs.count()
|
||||
|
||||
if "/question/bank" in page.url:
|
||||
self._recover_from_question_bank_picker(page)
|
||||
tabs = page.locator("[role='tab']")
|
||||
count = tabs.count()
|
||||
|
||||
if count == 0:
|
||||
self._print_debug_page(page, "No question bank tabs found")
|
||||
raise SystemExit("No question bank tabs found.")
|
||||
|
||||
names = [tabs.nth(index).inner_text().strip() for index in range(count)]
|
||||
@@ -177,23 +195,192 @@ class AnswerRunner:
|
||||
tabs.nth(best_index).click()
|
||||
page.wait_for_load_state("networkidle")
|
||||
|
||||
def _goto_target_page(self, page) -> None:
|
||||
page.goto(self.settings.target_url)
|
||||
page.wait_for_load_state("networkidle")
|
||||
if self._needs_login(page):
|
||||
return
|
||||
def _recover_from_question_bank_picker(self, page) -> None:
|
||||
print("Question bank picker is open. Returning to practice entry page.")
|
||||
self._close_question_bank_search_modal(page)
|
||||
if self._click_question_bank_back(page):
|
||||
try:
|
||||
self._wait_for_question_bank_tabs(page)
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
page.goto(self.settings.target_url, wait_until="domcontentloaded")
|
||||
self._wait_for_question_bank_tabs(page)
|
||||
|
||||
def _wait_for_question_bank_tabs(self, page) -> None:
|
||||
def _click_question_bank_back(self, page) -> bool:
|
||||
if "/question/bank" not in page.url:
|
||||
return False
|
||||
|
||||
clicked = page.evaluate(
|
||||
r"""() => {
|
||||
const candidates = Array.from(document.querySelectorAll(".van-nav-bar__left, .left-icon"));
|
||||
const target = candidates.find(element => {
|
||||
const rect = element.getBoundingClientRect();
|
||||
const style = window.getComputedStyle(element);
|
||||
return rect.width > 0 && rect.height > 0 && style.visibility !== "hidden";
|
||||
});
|
||||
if (!target) return false;
|
||||
target.click();
|
||||
return true;
|
||||
}"""
|
||||
)
|
||||
if clicked:
|
||||
page.wait_for_timeout(1000)
|
||||
return bool(clicked)
|
||||
|
||||
def _close_question_bank_search_modal(self, page) -> None:
|
||||
try:
|
||||
page.evaluate(
|
||||
r"""() => {
|
||||
const candidates = Array.from(document.querySelectorAll(".van-popup .van-icon-cross, .van-popup .close, .van-icon-cross, [class*='close']"));
|
||||
const visible = candidates.find(element => {
|
||||
const rect = element.getBoundingClientRect();
|
||||
const style = window.getComputedStyle(element);
|
||||
return rect.width > 0 && rect.height > 0 && style.visibility !== "hidden";
|
||||
});
|
||||
if (visible) visible.click();
|
||||
}"""
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
page.wait_for_timeout(500)
|
||||
|
||||
def _select_question_bank_from_picker(self, page, keyword: str) -> None:
|
||||
if not self._click_question_bank_by_text(page, keyword):
|
||||
self._print_debug_page(page, f"Question bank keyword not found: {keyword!r}")
|
||||
raise SystemExit(f"Question bank keyword not found: {keyword!r}")
|
||||
|
||||
try:
|
||||
self._wait_for_question_bank_tabs(page)
|
||||
except Exception as error:
|
||||
self._print_debug_page(
|
||||
page,
|
||||
f"Question bank was clicked but practice page did not load: {error}",
|
||||
)
|
||||
raise PageDebugPrintedError(
|
||||
"Question bank was clicked but practice page did not load."
|
||||
) from error
|
||||
|
||||
def _open_question_bank_selector(self, page) -> bool:
|
||||
reselect = page.get_by_text("重选题库", exact=True)
|
||||
if reselect.count() == 0:
|
||||
return False
|
||||
|
||||
reselect.first.click()
|
||||
page.wait_for_function(
|
||||
"""() => {
|
||||
const text = document.body.innerText;
|
||||
const tabs = document.querySelectorAll('[role="tab"]');
|
||||
return tabs.length > 0 || text.includes('顺序练习');
|
||||
"() => location.href.includes('/question/bank')",
|
||||
timeout=self.settings.timeout_ms,
|
||||
)
|
||||
return True
|
||||
|
||||
def _click_question_bank_by_text(self, page, keyword: str) -> bool:
|
||||
self._wait_for_question_bank_picker(page)
|
||||
if self._click_visible_question_bank_candidate(page, keyword):
|
||||
print(f"Question bank selected by text: keyword={keyword!r}")
|
||||
page.wait_for_timeout(1000)
|
||||
return True
|
||||
|
||||
search = page.locator("input[placeholder='请输入题库名称']").first
|
||||
if search.count() > 0:
|
||||
search.fill(keyword)
|
||||
search.press("Enter")
|
||||
search_icon = page.locator(".el-icon-search").first
|
||||
if search_icon.count() > 0:
|
||||
search_icon.click()
|
||||
page.wait_for_timeout(3000)
|
||||
if self._click_visible_question_bank_candidate(page, keyword):
|
||||
print(f"Question bank selected by text: keyword={keyword!r}")
|
||||
page.wait_for_timeout(1000)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _wait_for_question_bank_picker(self, page) -> None:
|
||||
page.wait_for_function(
|
||||
r"""() => {
|
||||
const text = document.body.innerText || "";
|
||||
const hasItem = document.querySelectorAll(".first_level_item, .van-cell, .el-tree-node__label, [role='tab']").length > 0;
|
||||
return hasItem || (text.includes("暂无数据") && !text.includes("加载中"));
|
||||
}""",
|
||||
timeout=self.settings.timeout_ms,
|
||||
)
|
||||
|
||||
def _click_visible_question_bank_candidate(self, page, keyword: str) -> bool:
|
||||
normalized_keyword = self._normalize_text(keyword)
|
||||
return bool(page.evaluate(
|
||||
r"""keyword => {
|
||||
const normalize = value => value.toLowerCase().replace(/\s+/g, "");
|
||||
const blocked = /开始练习|顺序练习|模拟考试|正式考试|我的收藏|我的错题|模拟记录/;
|
||||
const elements = Array.from(document.querySelectorAll("[role='tab'], .van-cell, .el-tree-node__label, .first_level_item, p"));
|
||||
const candidates = [];
|
||||
|
||||
for (const element of elements) {
|
||||
const text = (element.innerText || element.textContent || "").trim();
|
||||
if (!text || blocked.test(text) || !normalize(text).includes(keyword)) continue;
|
||||
|
||||
const rect = element.getBoundingClientRect();
|
||||
const style = window.getComputedStyle(element);
|
||||
if (rect.width <= 0 || rect.height <= 0 || style.visibility === "hidden") continue;
|
||||
|
||||
candidates.push({ element, textLength: text.length, area: rect.width * rect.height });
|
||||
}
|
||||
|
||||
candidates.sort((a, b) => a.textLength - b.textLength || a.area - b.area);
|
||||
const target = candidates[0]?.element?.closest(".first_level_item, .van-cell, .el-tree-node, [role='tab']") || candidates[0]?.element;
|
||||
if (!target) return false;
|
||||
target.click();
|
||||
return true;
|
||||
}""",
|
||||
normalized_keyword,
|
||||
))
|
||||
|
||||
def _goto_target_page(self, page) -> None:
|
||||
page.goto(self.settings.target_url, wait_until="domcontentloaded")
|
||||
try:
|
||||
page.wait_for_function(
|
||||
r"""() => {
|
||||
const text = document.body?.innerText || "";
|
||||
const tabs = document.querySelectorAll('[role="tab"]');
|
||||
return tabs.length > 0
|
||||
|| text.includes("登录状态已过期")
|
||||
|| text.includes("重新登录")
|
||||
|| location.href.includes("/question/bank")
|
||||
|| location.href.includes("/login");
|
||||
}""",
|
||||
timeout=self.settings.timeout_ms,
|
||||
)
|
||||
except Exception as error:
|
||||
if not self._needs_login(page):
|
||||
self._print_debug_page(page, f"Target page did not become ready: {error}")
|
||||
raise PageDebugPrintedError("Target page did not become ready.") from error
|
||||
|
||||
if self._needs_login(page):
|
||||
return
|
||||
|
||||
if "/question/bank" in page.url:
|
||||
return
|
||||
|
||||
self._wait_for_question_bank_tabs(page)
|
||||
|
||||
def _wait_for_question_bank_tabs(self, page) -> None:
|
||||
last_error: Exception | None = None
|
||||
for attempt in range(2):
|
||||
try:
|
||||
page.wait_for_function(
|
||||
"""() => document.querySelectorAll('[role="tab"]').length > 0""",
|
||||
timeout=self.settings.timeout_ms,
|
||||
)
|
||||
return
|
||||
except Exception as error:
|
||||
last_error = error
|
||||
if attempt == 0 and "/app/autonomous/test/index" in page.url:
|
||||
print("Question bank tabs did not load. Reloading practice entry page.")
|
||||
page.reload(wait_until="domcontentloaded")
|
||||
|
||||
if last_error:
|
||||
raise last_error
|
||||
|
||||
def _best_match(self, keyword: str, names: list[str]) -> tuple[int, str, float]:
|
||||
normalized_keyword = self._normalize_text(keyword)
|
||||
best_index = 0
|
||||
@@ -218,11 +405,25 @@ class AnswerRunner:
|
||||
def _start_sequential_practice(self, page) -> None:
|
||||
start = page.get_by_text("开始练习", exact=True).first
|
||||
if start.count() == 0:
|
||||
self._print_debug_page(page, "Sequential practice button not found")
|
||||
raise SystemExit("Sequential practice button not found.")
|
||||
|
||||
start.click()
|
||||
page.wait_for_url("**/practice/topics**")
|
||||
page.wait_for_load_state("networkidle")
|
||||
try:
|
||||
page.wait_for_function(
|
||||
r"""() => {
|
||||
const text = document.body.innerText || "";
|
||||
return location.href.includes("/practice/topics")
|
||||
|| text.includes("答案解析")
|
||||
|| text.includes("下一题")
|
||||
|| /正确答案\s*[::]/.test(text);
|
||||
}""",
|
||||
timeout=self.settings.timeout_ms,
|
||||
)
|
||||
page.wait_for_load_state("networkidle")
|
||||
except Exception as error:
|
||||
self._print_debug_page(page, f"Sequential practice did not open: {error}")
|
||||
raise PageDebugPrintedError("Sequential practice did not open.") from error
|
||||
|
||||
def _answer_questions(self, page, count: int) -> None:
|
||||
answered = 0
|
||||
@@ -232,8 +433,8 @@ class AnswerRunner:
|
||||
answer, revealed_analysis = self._reveal_and_read_answer(page)
|
||||
total = "until end" if count < 0 else str(count)
|
||||
print(f"Answering question {answered + 1}/{total}: {answer}")
|
||||
if revealed_analysis:
|
||||
print("Answer analysis opened before selection; skipping selection verification.")
|
||||
if self._has_my_answer(page):
|
||||
print("Question already answered; skipping selection verification.")
|
||||
else:
|
||||
self._select_answer_options(page, answer)
|
||||
self._wait_after_answer()
|
||||
@@ -317,6 +518,13 @@ class AnswerRunner:
|
||||
def _read_my_answer_from_dom(self, page) -> str:
|
||||
return self._read_answer_field_from_dom(page, "我的答案")
|
||||
|
||||
def _has_my_answer(self, page) -> bool:
|
||||
if self._read_my_answer_from_dom(page):
|
||||
return True
|
||||
|
||||
text = page.locator("body").inner_text(timeout=self.settings.timeout_ms)
|
||||
return bool(re.search(r"我的答案\s*[::]\s*\S+", text))
|
||||
|
||||
def _read_answer_field_from_dom(self, page, field_name: str) -> str:
|
||||
return page.evaluate(
|
||||
r"""fieldName => {
|
||||
@@ -513,7 +721,7 @@ class AnswerRunner:
|
||||
before = self._current_question_no(page)
|
||||
next_button = page.get_by_text("下一题", exact=True)
|
||||
if next_button.count() == 0:
|
||||
return False
|
||||
return self._maybe_reset_questions_from_answer_card(page, before)
|
||||
|
||||
try:
|
||||
next_button.click(timeout=3000)
|
||||
@@ -528,9 +736,119 @@ class AnswerRunner:
|
||||
)
|
||||
return True
|
||||
except Exception as error:
|
||||
if self._maybe_reset_questions_from_answer_card(page, before):
|
||||
return True
|
||||
self._print_debug_page(page, f"Next question did not advance: {error}")
|
||||
raise PageDebugPrintedError("Next question did not advance.") from error
|
||||
|
||||
def _maybe_reset_questions_from_answer_card(self, page, before: str) -> bool:
|
||||
if not self._open_answer_card(page):
|
||||
print("No next question and answer card was not found.")
|
||||
return False
|
||||
|
||||
reset_text = self._find_reset_question_text(page)
|
||||
if not reset_text:
|
||||
print("No next question and no reset/refresh entry found in answer card.")
|
||||
return False
|
||||
|
||||
if not self._confirm_reset_questions(reset_text):
|
||||
print("Question reset skipped by user.")
|
||||
return False
|
||||
|
||||
if not self._click_reset_questions(page, reset_text):
|
||||
self._print_debug_page(page, f"Reset entry was found but could not be clicked: {reset_text}")
|
||||
raise PageDebugPrintedError("Reset entry could not be clicked.")
|
||||
|
||||
self._confirm_reset_dialog_if_present(page)
|
||||
try:
|
||||
page.wait_for_function(
|
||||
r"""before => {
|
||||
const text = document.body.innerText || "";
|
||||
const match = text.match(/(\d+)\./);
|
||||
return text.includes("答案解析")
|
||||
|| text.includes("下一题")
|
||||
|| (match && match[1] !== before);
|
||||
}""",
|
||||
arg=before,
|
||||
timeout=self.settings.timeout_ms,
|
||||
)
|
||||
return True
|
||||
except Exception as error:
|
||||
self._print_debug_page(page, f"Question reset did not open a question: {error}")
|
||||
raise PageDebugPrintedError("Question reset did not open a question.") from error
|
||||
|
||||
def _open_answer_card(self, page) -> bool:
|
||||
answer_card = page.get_by_text("答题卡", exact=True)
|
||||
if answer_card.count() == 0:
|
||||
return False
|
||||
|
||||
try:
|
||||
answer_card.first.click(timeout=3000)
|
||||
page.wait_for_timeout(500)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _find_reset_question_text(self, page) -> str:
|
||||
return page.evaluate(
|
||||
r"""() => {
|
||||
const pattern = /(刷新题目|重置题目|重置|重新练习|重新开始|再练一次)/;
|
||||
const elements = Array.from(document.querySelectorAll("button, li, div, span, p"));
|
||||
for (const element of elements) {
|
||||
const text = (element.innerText || element.textContent || "").trim();
|
||||
if (!pattern.test(text)) continue;
|
||||
|
||||
const rect = element.getBoundingClientRect();
|
||||
const style = window.getComputedStyle(element);
|
||||
if (rect.width <= 0 || rect.height <= 0 || style.visibility === "hidden") continue;
|
||||
return text;
|
||||
}
|
||||
return "";
|
||||
}"""
|
||||
).strip()
|
||||
|
||||
def _confirm_reset_questions(self, reset_text: str) -> bool:
|
||||
try:
|
||||
answer = input(
|
||||
f"No more questions. Found reset entry {reset_text!r}. Reset questions? [y/N]: "
|
||||
)
|
||||
except EOFError:
|
||||
return False
|
||||
return answer.strip().lower() in {"y", "yes"}
|
||||
|
||||
def _click_reset_questions(self, page, reset_text: str) -> bool:
|
||||
return bool(
|
||||
page.evaluate(
|
||||
r"""resetText => {
|
||||
const pattern = /(刷新题目|重置题目|重置|重新练习|重新开始|再练一次)/;
|
||||
const elements = Array.from(document.querySelectorAll("button, li, div, span, p"));
|
||||
for (const element of elements) {
|
||||
const text = (element.innerText || element.textContent || "").trim();
|
||||
if (text !== resetText && !pattern.test(text)) continue;
|
||||
|
||||
const rect = element.getBoundingClientRect();
|
||||
const style = window.getComputedStyle(element);
|
||||
if (rect.width <= 0 || rect.height <= 0 || style.visibility === "hidden") continue;
|
||||
|
||||
const target = element.closest("button, li, .van-button, .van-cell") || element;
|
||||
target.click();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}""",
|
||||
reset_text,
|
||||
)
|
||||
)
|
||||
|
||||
def _confirm_reset_dialog_if_present(self, page) -> None:
|
||||
page.wait_for_timeout(500)
|
||||
for text in ("确定", "确认", "继续"):
|
||||
button = page.get_by_role("button", name=text)
|
||||
if button.count() > 0:
|
||||
button.first.click(timeout=3000)
|
||||
page.wait_for_timeout(500)
|
||||
return
|
||||
|
||||
def _current_question_no(self, page) -> str:
|
||||
text = page.locator("body").inner_text(timeout=self.settings.timeout_ms)
|
||||
match = re.search(r"(\d+)\.", text)
|
||||
|
||||
Reference in New Issue
Block a user