fix: 修复最后结束没有退出的问题
This commit is contained in:
+173
-27
@@ -12,6 +12,10 @@ from auto_answer.config import Settings
|
||||
from auto_answer.solver import Solver
|
||||
|
||||
|
||||
class PageDebugPrintedError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class AnswerRunner:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.settings = settings
|
||||
@@ -142,6 +146,8 @@ class AnswerRunner:
|
||||
self._start_sequential_practice(page)
|
||||
if self.settings.answer_count != 0:
|
||||
self._answer_questions(page, self.settings.answer_count)
|
||||
self._print_page_summary(page)
|
||||
return
|
||||
|
||||
self._print_page_summary(page)
|
||||
print("Practice page is ready. Close Edge when you are done.")
|
||||
@@ -221,16 +227,25 @@ class AnswerRunner:
|
||||
def _answer_questions(self, page, count: int) -> None:
|
||||
answered = 0
|
||||
while count < 0 or answered < count:
|
||||
self._wait_before_answer()
|
||||
answer = self._reveal_and_read_answer(page)
|
||||
total = "until end" if count < 0 else str(count)
|
||||
print(f"Answering question {answered + 1}/{total}: {answer}")
|
||||
self._select_answer_options(page, answer)
|
||||
self._wait_after_answer()
|
||||
if not self._go_next_question(page):
|
||||
print("No next question detected. Stopping.")
|
||||
return
|
||||
answered += 1
|
||||
try:
|
||||
self._wait_before_answer()
|
||||
answer = self._reveal_and_read_answer(page)
|
||||
total = "until end" if count < 0 else str(count)
|
||||
print(f"Answering question {answered + 1}/{total}: {answer}")
|
||||
self._select_answer_options(page, answer)
|
||||
self._wait_after_answer()
|
||||
self._verify_answer_result(page, answer)
|
||||
if not self._go_next_question(page):
|
||||
print("No next question detected. Stopping.")
|
||||
return
|
||||
answered += 1
|
||||
except PageDebugPrintedError as error:
|
||||
raise SystemExit(str(error)) from error
|
||||
except SystemExit:
|
||||
raise
|
||||
except Exception as error:
|
||||
self._print_debug_page(page, f"Unexpected answer loop error: {error}")
|
||||
raise SystemExit(f"Unexpected answer loop error: {error}") from error
|
||||
|
||||
def _wait_before_answer(self) -> None:
|
||||
minimum, maximum = self._parse_delay_range(self.settings.answer_wait_s)
|
||||
@@ -260,41 +275,141 @@ class AnswerRunner:
|
||||
def _reveal_and_read_answer(self, page) -> str:
|
||||
body = page.locator("body")
|
||||
text = body.inner_text(timeout=self.settings.timeout_ms)
|
||||
if "正确答案:" not in text:
|
||||
if not re.search(r"正确答案\s*[::]", text):
|
||||
page.get_by_text("答案解析", exact=True).click()
|
||||
page.wait_for_function(
|
||||
"() => document.body.innerText.includes('正确答案:')",
|
||||
"() => /正确答案\\s*[::]/.test(document.body.innerText)",
|
||||
timeout=self.settings.timeout_ms,
|
||||
)
|
||||
text = body.inner_text(timeout=self.settings.timeout_ms)
|
||||
|
||||
match = re.search(r"正确答案[::]\s*([^\r\n]+)", text)
|
||||
match = re.search(
|
||||
r"正确答案\s*[::]\s*(.*?)(?=\s*(?:参考解析|答案解析|答题卡|我的答案|$))",
|
||||
text,
|
||||
re.S,
|
||||
)
|
||||
if not match:
|
||||
self._print_debug_page(page, "Correct answer regex did not match")
|
||||
raise SystemExit("Correct answer not found after opening answer analysis.")
|
||||
raise PageDebugPrintedError(
|
||||
"Correct answer not found after opening answer analysis."
|
||||
)
|
||||
|
||||
raw_answer = match.group(1).strip()
|
||||
letter_answer = "".join(re.findall(r"[A-Z]", raw_answer.upper()))
|
||||
return letter_answer or raw_answer
|
||||
return self._parse_answer_text(raw_answer)
|
||||
|
||||
def _parse_answer_text(self, raw_answer: str) -> str:
|
||||
answer_text = self._normalize_answer_text(raw_answer)
|
||||
if not answer_text:
|
||||
raise PageDebugPrintedError("Correct answer is empty.")
|
||||
|
||||
answer_text = re.split(r"\s*(?:解析|说明)\s*[::]", answer_text, maxsplit=1)[0]
|
||||
letter_tokens = self._extract_letter_answer(answer_text)
|
||||
if letter_tokens:
|
||||
return letter_tokens
|
||||
|
||||
return answer_text.strip(" \t\r\n,,、;;。.")
|
||||
|
||||
def _normalize_answer_text(self, text: str) -> str:
|
||||
return text.translate(
|
||||
str.maketrans(
|
||||
{
|
||||
"A": "A",
|
||||
"B": "B",
|
||||
"C": "C",
|
||||
"D": "D",
|
||||
"E": "E",
|
||||
"F": "F",
|
||||
"G": "G",
|
||||
"a": "A",
|
||||
"b": "B",
|
||||
"c": "C",
|
||||
"d": "D",
|
||||
"e": "E",
|
||||
"f": "F",
|
||||
"g": "G",
|
||||
".": ".",
|
||||
}
|
||||
)
|
||||
).strip()
|
||||
|
||||
def _extract_letter_answer(self, answer_text: str) -> str:
|
||||
text = answer_text.upper().strip()
|
||||
compact = re.sub(r"[\s,,、;;/|]+", "", text)
|
||||
if re.fullmatch(r"[A-G]{1,7}", compact):
|
||||
return compact
|
||||
|
||||
tokens = re.findall(
|
||||
r"(?<![A-Z0-9])([A-G])(?=\s*(?:[、,,;;/|]|和|及|与|$))",
|
||||
text,
|
||||
)
|
||||
return "".join(tokens)
|
||||
|
||||
def _select_answer_options(self, page, answer: str) -> None:
|
||||
if not answer:
|
||||
raise SystemExit("Empty answer.")
|
||||
raise PageDebugPrintedError("Empty answer.")
|
||||
|
||||
if not re.fullmatch(r"[A-Z]+", answer):
|
||||
self._select_text_answer_option(page, answer)
|
||||
return
|
||||
|
||||
for letter in answer:
|
||||
option = page.locator("li").filter(has_text=re.compile(rf"^\s*{letter}\s*\."))
|
||||
if option.count() == 0:
|
||||
option = page.get_by_text(re.compile(rf"^\s*{letter}\s*\.")).locator("..")
|
||||
if option.count() == 0:
|
||||
if not self._click_letter_answer_option(page, letter):
|
||||
self._print_debug_page(page, f"Option not found: {letter}")
|
||||
raise SystemExit(f"Option not found: {letter}")
|
||||
option.first.click()
|
||||
raise PageDebugPrintedError(f"Option not found: {letter}")
|
||||
time.sleep(0.2)
|
||||
|
||||
def _click_letter_answer_option(self, page, letter: str) -> bool:
|
||||
option_prefix = re.compile(rf"^\s*{letter}\s*(?:[.、.])", re.I)
|
||||
options = page.locator("li")
|
||||
for index in range(options.count()):
|
||||
option = options.nth(index)
|
||||
try:
|
||||
text = option.inner_text(timeout=1000).strip()
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not option_prefix.search(text):
|
||||
continue
|
||||
if any(label in text for label in ("正确答案", "参考解析", "答案解析", "答题卡")):
|
||||
continue
|
||||
|
||||
option.click()
|
||||
return True
|
||||
|
||||
return bool(
|
||||
page.evaluate(
|
||||
r"""letter => {
|
||||
const prefix = new RegExp(`^\\s*${letter}\\s*(?:[.、.])`, "i");
|
||||
const blocked = /正确答案|参考解析|答案解析|答题卡/;
|
||||
const selectors = [
|
||||
"label",
|
||||
"[role='radio']",
|
||||
"[role='checkbox']",
|
||||
".van-radio",
|
||||
".van-checkbox",
|
||||
".option",
|
||||
"li",
|
||||
];
|
||||
const seen = new Set();
|
||||
for (const selector of selectors) {
|
||||
for (const element of document.querySelectorAll(selector)) {
|
||||
if (seen.has(element)) continue;
|
||||
seen.add(element);
|
||||
const rect = element.getBoundingClientRect();
|
||||
const style = window.getComputedStyle(element);
|
||||
const text = (element.innerText || element.textContent || "").trim();
|
||||
if (!text || blocked.test(text) || !prefix.test(text)) continue;
|
||||
if (rect.width <= 0 || rect.height <= 0 || style.visibility === "hidden") continue;
|
||||
element.click();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}""",
|
||||
letter,
|
||||
)
|
||||
)
|
||||
|
||||
def _select_text_answer_option(self, page, answer: str) -> None:
|
||||
options = page.locator("li")
|
||||
for index in range(options.count()):
|
||||
@@ -308,7 +423,38 @@ class AnswerRunner:
|
||||
continue
|
||||
|
||||
self._print_debug_page(page, f"Text answer option not found: {answer}")
|
||||
raise SystemExit(f"Text answer option not found: {answer}")
|
||||
raise PageDebugPrintedError(f"Text answer option not found: {answer}")
|
||||
|
||||
def _verify_answer_result(self, page, expected_answer: str) -> None:
|
||||
text = page.locator("body").inner_text(timeout=self.settings.timeout_ms)
|
||||
match = re.search(
|
||||
r"我的答案\s*[::]\s*(.*?)(?=\s*(?:正确答案|参考解析|答案解析|答题卡|$))",
|
||||
text,
|
||||
re.S,
|
||||
)
|
||||
if not match:
|
||||
self._print_debug_page(page, "My answer field not found after selecting answer")
|
||||
raise PageDebugPrintedError("My answer field not found after selecting answer.")
|
||||
|
||||
raw_actual_answer = match.group(1).strip()
|
||||
if not raw_actual_answer:
|
||||
self._print_debug_page(
|
||||
page,
|
||||
f"Selected answer mismatch: expected={expected_answer}, actual=<empty>",
|
||||
)
|
||||
raise PageDebugPrintedError(
|
||||
f"Selected answer mismatch: expected={expected_answer}, actual=<empty>"
|
||||
)
|
||||
|
||||
actual_answer = self._parse_answer_text(raw_actual_answer)
|
||||
if actual_answer != expected_answer:
|
||||
self._print_debug_page(
|
||||
page,
|
||||
f"Selected answer mismatch: expected={expected_answer}, actual={actual_answer}",
|
||||
)
|
||||
raise PageDebugPrintedError(
|
||||
f"Selected answer mismatch: expected={expected_answer}, actual={actual_answer}"
|
||||
)
|
||||
|
||||
def _go_next_question(self, page) -> bool:
|
||||
before = self._current_question_no(page)
|
||||
@@ -328,9 +474,9 @@ class AnswerRunner:
|
||||
timeout=self.settings.timeout_ms,
|
||||
)
|
||||
return True
|
||||
except Exception:
|
||||
self._print_debug_page(page, "Next question did not advance")
|
||||
return False
|
||||
except Exception as error:
|
||||
self._print_debug_page(page, f"Next question did not advance: {error}")
|
||||
raise PageDebugPrintedError("Next question did not advance.") from error
|
||||
|
||||
def _current_question_no(self, page) -> str:
|
||||
text = page.locator("body").inner_text(timeout=self.settings.timeout_ms)
|
||||
|
||||
Reference in New Issue
Block a user