From 68bc20719555b9815c800f46426ee33a3da1f94f Mon Sep 17 00:00:00 2001 From: chuan Date: Thu, 11 Jun 2026 23:07:58 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E6=96=B0=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E7=BD=91=E9=A1=B5=E4=B8=8D=E9=80=82=E9=85=8D=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/auto_answer/runner.py | 350 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 334 insertions(+), 16 deletions(-) diff --git a/src/auto_answer/runner.py b/src/auto_answer/runner.py index 34af013..7c8a7f4 100644 --- a/src/auto_answer/runner.py +++ b/src/auto_answer/runner.py @@ -80,10 +80,18 @@ class AnswerRunner: def _go_login(self, page) -> None: relogin = page.get_by_role("button", name="重新登录") if relogin.count() > 0: + print("Login expired dialog detected. Clicking relogin.") relogin.first.click() time.sleep(1) + else: + relogin_text = page.get_by_text("重新登录", exact=True) + if relogin_text.count() > 0: + print("Login expired dialog detected. Clicking relogin.") + relogin_text.first.click() + time.sleep(1) if "/login" not in page.url: + print("Opening login page.") page.goto(self.settings.login_url) page.wait_for_load_state("domcontentloaded") @@ -162,10 +170,20 @@ class AnswerRunner: page.wait_for_event("close", timeout=0) def _select_question_bank(self, page, keyword: str) -> None: + if "/question/bank" in page.url: + self._recover_from_question_bank_picker(page) + self._wait_for_question_bank_tabs(page) - tabs = page.get_by_role("tab") + tabs = page.locator("[role='tab']") count = tabs.count() + + if "/question/bank" in page.url: + self._recover_from_question_bank_picker(page) + tabs = page.locator("[role='tab']") + count = tabs.count() + if count == 0: + self._print_debug_page(page, "No question bank tabs found") raise SystemExit("No question bank tabs found.") names = [tabs.nth(index).inner_text().strip() for index in range(count)] @@ -177,23 +195,192 @@ class AnswerRunner: tabs.nth(best_index).click() page.wait_for_load_state("networkidle") - def _goto_target_page(self, page) -> None: - page.goto(self.settings.target_url) - page.wait_for_load_state("networkidle") - if self._needs_login(page): - return + def _recover_from_question_bank_picker(self, page) -> None: + print("Question bank picker is open. Returning to practice entry page.") + self._close_question_bank_search_modal(page) + if self._click_question_bank_back(page): + try: + self._wait_for_question_bank_tabs(page) + return + except Exception: + pass + + page.goto(self.settings.target_url, wait_until="domcontentloaded") self._wait_for_question_bank_tabs(page) - def _wait_for_question_bank_tabs(self, page) -> None: + def _click_question_bank_back(self, page) -> bool: + if "/question/bank" not in page.url: + return False + + clicked = page.evaluate( + r"""() => { + const candidates = Array.from(document.querySelectorAll(".van-nav-bar__left, .left-icon")); + const target = candidates.find(element => { + const rect = element.getBoundingClientRect(); + const style = window.getComputedStyle(element); + return rect.width > 0 && rect.height > 0 && style.visibility !== "hidden"; + }); + if (!target) return false; + target.click(); + return true; + }""" + ) + if clicked: + page.wait_for_timeout(1000) + return bool(clicked) + + def _close_question_bank_search_modal(self, page) -> None: + try: + page.evaluate( + r"""() => { + const candidates = Array.from(document.querySelectorAll(".van-popup .van-icon-cross, .van-popup .close, .van-icon-cross, [class*='close']")); + const visible = candidates.find(element => { + const rect = element.getBoundingClientRect(); + const style = window.getComputedStyle(element); + return rect.width > 0 && rect.height > 0 && style.visibility !== "hidden"; + }); + if (visible) visible.click(); + }""" + ) + except Exception: + pass + page.wait_for_timeout(500) + + def _select_question_bank_from_picker(self, page, keyword: str) -> None: + if not self._click_question_bank_by_text(page, keyword): + self._print_debug_page(page, f"Question bank keyword not found: {keyword!r}") + raise SystemExit(f"Question bank keyword not found: {keyword!r}") + + try: + self._wait_for_question_bank_tabs(page) + except Exception as error: + self._print_debug_page( + page, + f"Question bank was clicked but practice page did not load: {error}", + ) + raise PageDebugPrintedError( + "Question bank was clicked but practice page did not load." + ) from error + + def _open_question_bank_selector(self, page) -> bool: + reselect = page.get_by_text("重选题库", exact=True) + if reselect.count() == 0: + return False + + reselect.first.click() page.wait_for_function( - """() => { - const text = document.body.innerText; - const tabs = document.querySelectorAll('[role="tab"]'); - return tabs.length > 0 || text.includes('顺序练习'); + "() => location.href.includes('/question/bank')", + timeout=self.settings.timeout_ms, + ) + return True + + def _click_question_bank_by_text(self, page, keyword: str) -> bool: + self._wait_for_question_bank_picker(page) + if self._click_visible_question_bank_candidate(page, keyword): + print(f"Question bank selected by text: keyword={keyword!r}") + page.wait_for_timeout(1000) + return True + + search = page.locator("input[placeholder='请输入题库名称']").first + if search.count() > 0: + search.fill(keyword) + search.press("Enter") + search_icon = page.locator(".el-icon-search").first + if search_icon.count() > 0: + search_icon.click() + page.wait_for_timeout(3000) + if self._click_visible_question_bank_candidate(page, keyword): + print(f"Question bank selected by text: keyword={keyword!r}") + page.wait_for_timeout(1000) + return True + + return False + + def _wait_for_question_bank_picker(self, page) -> None: + page.wait_for_function( + r"""() => { + const text = document.body.innerText || ""; + const hasItem = document.querySelectorAll(".first_level_item, .van-cell, .el-tree-node__label, [role='tab']").length > 0; + return hasItem || (text.includes("暂无数据") && !text.includes("加载中")); }""", timeout=self.settings.timeout_ms, ) + def _click_visible_question_bank_candidate(self, page, keyword: str) -> bool: + normalized_keyword = self._normalize_text(keyword) + return bool(page.evaluate( + r"""keyword => { + const normalize = value => value.toLowerCase().replace(/\s+/g, ""); + const blocked = /开始练习|顺序练习|模拟考试|正式考试|我的收藏|我的错题|模拟记录/; + const elements = Array.from(document.querySelectorAll("[role='tab'], .van-cell, .el-tree-node__label, .first_level_item, p")); + const candidates = []; + + for (const element of elements) { + const text = (element.innerText || element.textContent || "").trim(); + if (!text || blocked.test(text) || !normalize(text).includes(keyword)) continue; + + const rect = element.getBoundingClientRect(); + const style = window.getComputedStyle(element); + if (rect.width <= 0 || rect.height <= 0 || style.visibility === "hidden") continue; + + candidates.push({ element, textLength: text.length, area: rect.width * rect.height }); + } + + candidates.sort((a, b) => a.textLength - b.textLength || a.area - b.area); + const target = candidates[0]?.element?.closest(".first_level_item, .van-cell, .el-tree-node, [role='tab']") || candidates[0]?.element; + if (!target) return false; + target.click(); + return true; + }""", + normalized_keyword, + )) + + def _goto_target_page(self, page) -> None: + page.goto(self.settings.target_url, wait_until="domcontentloaded") + try: + page.wait_for_function( + r"""() => { + const text = document.body?.innerText || ""; + const tabs = document.querySelectorAll('[role="tab"]'); + return tabs.length > 0 + || text.includes("登录状态已过期") + || text.includes("重新登录") + || location.href.includes("/question/bank") + || location.href.includes("/login"); + }""", + timeout=self.settings.timeout_ms, + ) + except Exception as error: + if not self._needs_login(page): + self._print_debug_page(page, f"Target page did not become ready: {error}") + raise PageDebugPrintedError("Target page did not become ready.") from error + + if self._needs_login(page): + return + + if "/question/bank" in page.url: + return + + self._wait_for_question_bank_tabs(page) + + def _wait_for_question_bank_tabs(self, page) -> None: + last_error: Exception | None = None + for attempt in range(2): + try: + page.wait_for_function( + """() => document.querySelectorAll('[role="tab"]').length > 0""", + timeout=self.settings.timeout_ms, + ) + return + except Exception as error: + last_error = error + if attempt == 0 and "/app/autonomous/test/index" in page.url: + print("Question bank tabs did not load. Reloading practice entry page.") + page.reload(wait_until="domcontentloaded") + + if last_error: + raise last_error + def _best_match(self, keyword: str, names: list[str]) -> tuple[int, str, float]: normalized_keyword = self._normalize_text(keyword) best_index = 0 @@ -218,11 +405,25 @@ class AnswerRunner: def _start_sequential_practice(self, page) -> None: start = page.get_by_text("开始练习", exact=True).first if start.count() == 0: + self._print_debug_page(page, "Sequential practice button not found") raise SystemExit("Sequential practice button not found.") start.click() - page.wait_for_url("**/practice/topics**") - page.wait_for_load_state("networkidle") + try: + page.wait_for_function( + r"""() => { + const text = document.body.innerText || ""; + return location.href.includes("/practice/topics") + || text.includes("答案解析") + || text.includes("下一题") + || /正确答案\s*[::]/.test(text); + }""", + timeout=self.settings.timeout_ms, + ) + page.wait_for_load_state("networkidle") + except Exception as error: + self._print_debug_page(page, f"Sequential practice did not open: {error}") + raise PageDebugPrintedError("Sequential practice did not open.") from error def _answer_questions(self, page, count: int) -> None: answered = 0 @@ -232,8 +433,8 @@ class AnswerRunner: answer, revealed_analysis = self._reveal_and_read_answer(page) total = "until end" if count < 0 else str(count) print(f"Answering question {answered + 1}/{total}: {answer}") - if revealed_analysis: - print("Answer analysis opened before selection; skipping selection verification.") + if self._has_my_answer(page): + print("Question already answered; skipping selection verification.") else: self._select_answer_options(page, answer) self._wait_after_answer() @@ -317,6 +518,13 @@ class AnswerRunner: def _read_my_answer_from_dom(self, page) -> str: return self._read_answer_field_from_dom(page, "我的答案") + def _has_my_answer(self, page) -> bool: + if self._read_my_answer_from_dom(page): + return True + + text = page.locator("body").inner_text(timeout=self.settings.timeout_ms) + return bool(re.search(r"我的答案\s*[::]\s*\S+", text)) + def _read_answer_field_from_dom(self, page, field_name: str) -> str: return page.evaluate( r"""fieldName => { @@ -513,7 +721,7 @@ class AnswerRunner: before = self._current_question_no(page) next_button = page.get_by_text("下一题", exact=True) if next_button.count() == 0: - return False + return self._maybe_reset_questions_from_answer_card(page, before) try: next_button.click(timeout=3000) @@ -528,9 +736,119 @@ class AnswerRunner: ) return True except Exception as error: + if self._maybe_reset_questions_from_answer_card(page, before): + return True self._print_debug_page(page, f"Next question did not advance: {error}") raise PageDebugPrintedError("Next question did not advance.") from error + def _maybe_reset_questions_from_answer_card(self, page, before: str) -> bool: + if not self._open_answer_card(page): + print("No next question and answer card was not found.") + return False + + reset_text = self._find_reset_question_text(page) + if not reset_text: + print("No next question and no reset/refresh entry found in answer card.") + return False + + if not self._confirm_reset_questions(reset_text): + print("Question reset skipped by user.") + return False + + if not self._click_reset_questions(page, reset_text): + self._print_debug_page(page, f"Reset entry was found but could not be clicked: {reset_text}") + raise PageDebugPrintedError("Reset entry could not be clicked.") + + self._confirm_reset_dialog_if_present(page) + try: + page.wait_for_function( + r"""before => { + const text = document.body.innerText || ""; + const match = text.match(/(\d+)\./); + return text.includes("答案解析") + || text.includes("下一题") + || (match && match[1] !== before); + }""", + arg=before, + timeout=self.settings.timeout_ms, + ) + return True + except Exception as error: + self._print_debug_page(page, f"Question reset did not open a question: {error}") + raise PageDebugPrintedError("Question reset did not open a question.") from error + + def _open_answer_card(self, page) -> bool: + answer_card = page.get_by_text("答题卡", exact=True) + if answer_card.count() == 0: + return False + + try: + answer_card.first.click(timeout=3000) + page.wait_for_timeout(500) + return True + except Exception: + return False + + def _find_reset_question_text(self, page) -> str: + return page.evaluate( + r"""() => { + const pattern = /(刷新题目|重置题目|重置|重新练习|重新开始|再练一次)/; + const elements = Array.from(document.querySelectorAll("button, li, div, span, p")); + for (const element of elements) { + const text = (element.innerText || element.textContent || "").trim(); + if (!pattern.test(text)) continue; + + const rect = element.getBoundingClientRect(); + const style = window.getComputedStyle(element); + if (rect.width <= 0 || rect.height <= 0 || style.visibility === "hidden") continue; + return text; + } + return ""; + }""" + ).strip() + + def _confirm_reset_questions(self, reset_text: str) -> bool: + try: + answer = input( + f"No more questions. Found reset entry {reset_text!r}. Reset questions? [y/N]: " + ) + except EOFError: + return False + return answer.strip().lower() in {"y", "yes"} + + def _click_reset_questions(self, page, reset_text: str) -> bool: + return bool( + page.evaluate( + r"""resetText => { + const pattern = /(刷新题目|重置题目|重置|重新练习|重新开始|再练一次)/; + const elements = Array.from(document.querySelectorAll("button, li, div, span, p")); + for (const element of elements) { + const text = (element.innerText || element.textContent || "").trim(); + if (text !== resetText && !pattern.test(text)) continue; + + const rect = element.getBoundingClientRect(); + const style = window.getComputedStyle(element); + if (rect.width <= 0 || rect.height <= 0 || style.visibility === "hidden") continue; + + const target = element.closest("button, li, .van-button, .van-cell") || element; + target.click(); + return true; + } + return false; + }""", + reset_text, + ) + ) + + def _confirm_reset_dialog_if_present(self, page) -> None: + page.wait_for_timeout(500) + for text in ("确定", "确认", "继续"): + button = page.get_by_role("button", name=text) + if button.count() > 0: + button.first.click(timeout=3000) + page.wait_for_timeout(500) + return + def _current_question_no(self, page) -> str: text = page.locator("body").inner_text(timeout=self.settings.timeout_ms) match = re.search(r"(\d+)\.", text)