feat: 完成基础功能

This commit is contained in:
chuan
2026-06-02 23:56:19 +08:00
Unverified
parent e80539af4c
commit 1b7c87f2f2
8 changed files with 564 additions and 17 deletions
+11 -1
View File
@@ -1,3 +1,13 @@
TARGET_URL=https://example.com
LOGIN_URL=https://sia.sinopec.com/mobile/#/login
TARGET_URL=https://sia.sinopec.com/mobile/#/app/autonomous/test/index?fromName=index
HEADLESS=false
TIMEOUT_MS=30000
BROWSER_CHANNEL=msedge
USER_DATA_DIR=.auth/edge
STORAGE_STATE_PATH=.auth/storage_state.json
SESSION_STATE_PATH=.auth/session_storage.json
LOGIN_USERNAME=
LOGIN_PASSWORD=
QUESTION_BANK=测录融合
ANSWER_COUNT=0
ANSWER_DELAY_MS=0
+6
View File
@@ -8,3 +8,9 @@ wheels/
# Virtual environments
.venv
.env
# Local browser login state
.auth/
.playwright-mcp/
+32 -1
View File
@@ -14,7 +14,38 @@ uv run playwright install chromium
## 运行
```powershell
uv run auto-answer https://example.com
uv run auto-answer login
uv run auto-answer run
```
打开自主练测页,如果登录过期则自动登录:
```powershell
uv run auto-answer practice
```
指定题库关键词,程序会自动选择匹配度最高的题库 tab,并进入顺序练习:
```powershell
uv run auto-answer practice --bank "测录融合"
```
进入顺序练习后自动答指定数量的题:
```powershell
uv run auto-answer practice --bank "测录融合" --answer-count 10
```
一直答到没有下一题,并在每题后等待 500ms:
```powershell
uv run auto-answer practice --bank "测录融合" --answer-count -1 --delay-ms 500
```
连续登录并复用同一个 Edge 窗口:
```powershell
uv run auto-answer start
```
也可以复制 `.env.example``.env`,然后直接运行:
+83 -2
View File
@@ -2,8 +2,10 @@ from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
import json
from pathlib import Path
from playwright.sync_api import Browser, Page, sync_playwright
from playwright.sync_api import Browser, BrowserContext, Page, sync_playwright
from auto_answer.config import Settings
@@ -11,10 +13,89 @@ from auto_answer.config import Settings
@contextmanager
def open_page(settings: Settings) -> Iterator[Page]:
with sync_playwright() as playwright:
browser: Browser = playwright.chromium.launch(headless=settings.headless)
context: BrowserContext = playwright.chromium.launch_persistent_context(
user_data_dir=Path(settings.user_data_dir),
channel=settings.browser_channel,
headless=settings.headless,
)
page = context.pages[0] if context.pages else context.new_page()
page.set_default_timeout(settings.timeout_ms)
try:
yield page
finally:
context.close()
@contextmanager
def open_fresh_page(settings: Settings) -> Iterator[Page]:
with sync_playwright() as playwright:
browser: Browser = playwright.chromium.launch(
channel=settings.browser_channel,
headless=settings.headless,
)
page = browser.new_page()
page.set_default_timeout(settings.timeout_ms)
try:
yield page
finally:
browser.close()
@contextmanager
def open_login_page(settings: Settings) -> Iterator[tuple[BrowserContext, Page]]:
with sync_playwright() as playwright:
browser: Browser = playwright.chromium.launch(
channel=settings.browser_channel,
headless=False,
)
context_options = {}
storage_state_path = Path(settings.storage_state_path)
if storage_state_path.exists():
context_options["storage_state"] = storage_state_path
context = browser.new_context(**context_options)
session_state_path = Path(settings.session_state_path)
if session_state_path.exists():
session_data = json.dumps(json.loads(session_state_path.read_text()))
context.add_init_script(
script=f"""
(() => {{
const data = {session_data};
for (const [origin, values] of Object.entries(data)) {{
if (location.origin !== origin) continue;
for (const [key, value] of Object.entries(values)) {{
sessionStorage.setItem(key, value);
}}
}}
}})();
"""
)
page = context.new_page()
page.set_default_timeout(settings.timeout_ms)
try:
yield context, page
finally:
browser.close()
@contextmanager
def open_page_with_state(settings: Settings) -> Iterator[Page]:
state_path = Path(settings.storage_state_path)
if not state_path.exists():
raise SystemExit(
f"Missing login state: {state_path}. Run `uv run auto-answer login` first."
)
with sync_playwright() as playwright:
browser: Browser = playwright.chromium.launch(
channel=settings.browser_channel,
headless=settings.headless,
)
context = browser.new_context(storage_state=state_path)
page = context.new_page()
page.set_default_timeout(settings.timeout_ms)
try:
yield page
finally:
browser.close()
+59 -7
View File
@@ -6,26 +6,78 @@ from auto_answer.config import Settings
from auto_answer.runner import AnswerRunner
def build_settings(args: argparse.Namespace) -> Settings:
values: dict[str, object] = {"headless": args.headless}
if args.url and args.command == "login":
values["login_url"] = args.url
elif args.url:
values["target_url"] = args.url
if getattr(args, "bank", None):
values["question_bank"] = args.bank
if getattr(args, "answer_count", None) is not None:
values["answer_count"] = args.answer_count
if getattr(args, "delay_ms", None) is not None:
values["answer_delay_ms"] = args.delay_ms
return Settings(**values)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Run the auto-answer bot.")
parser.add_argument(
"url",
nargs="?",
help="Target page URL. Falls back to TARGET_URL from .env.",
)
parser.add_argument(
"--headless",
action="store_true",
help="Run browser in headless mode.",
)
subparsers = parser.add_subparsers(dest="command", required=True)
login_parser = subparsers.add_parser("login", help="Open Edge and save login state.")
login_parser.add_argument("url", nargs="?", help="Target page URL.")
start_parser = subparsers.add_parser("start", help="Log in and continue in one Edge session.")
start_parser.add_argument("url", nargs="?", help="Target page URL.")
run_parser = subparsers.add_parser("run", help="Run the bot with saved login state.")
run_parser.add_argument("url", nargs="?", help="Target page URL.")
practice_parser = subparsers.add_parser(
"practice",
help="Open practice page, auto-login if needed, and wait.",
)
practice_parser.add_argument("url", nargs="?", help="Target page URL.")
practice_parser.add_argument("--bank", help="Question bank keyword.")
practice_parser.add_argument(
"--answer-count",
type=int,
help="Number of questions to answer automatically. Use -1 to answer until the end.",
)
practice_parser.add_argument(
"--delay-ms",
type=int,
help="Delay after each answered question, in milliseconds.",
)
open_parser = subparsers.add_parser("open", help="Open the target page and wait.")
open_parser.add_argument("url", nargs="?", help="Target page URL.")
return parser
def main() -> None:
args = build_parser().parse_args()
settings = Settings(target_url=args.url, headless=args.headless)
settings = build_settings(args)
runner = AnswerRunner(settings)
runner.run()
if args.command == "login":
runner.login()
elif args.command == "start":
runner.start()
elif args.command == "run":
runner.run()
elif args.command == "practice":
runner.practice()
elif args.command == "open":
runner.open()
if __name__ == "__main__":
+22 -1
View File
@@ -3,6 +3,11 @@ from __future__ import annotations
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
DEFAULT_LOGIN_URL = "https://sia.sinopec.com/mobile/#/login"
DEFAULT_TARGET_URL = (
"https://sia.sinopec.com/mobile/#/app/autonomous/test/index?fromName=index"
)
class Settings(BaseSettings):
model_config = SettingsConfigDict(
@@ -12,6 +17,22 @@ class Settings(BaseSettings):
populate_by_name=True,
)
target_url: str | None = Field(default=None, alias="TARGET_URL")
login_url: str = Field(default=DEFAULT_LOGIN_URL, alias="LOGIN_URL")
target_url: str = Field(default=DEFAULT_TARGET_URL, alias="TARGET_URL")
headless: bool = Field(default=False, alias="HEADLESS")
timeout_ms: int = Field(default=30_000, alias="TIMEOUT_MS")
browser_channel: str = Field(default="msedge", alias="BROWSER_CHANNEL")
user_data_dir: str = Field(default=".auth/edge", alias="USER_DATA_DIR")
storage_state_path: str = Field(
default=".auth/storage_state.json",
alias="STORAGE_STATE_PATH",
)
session_state_path: str = Field(
default=".auth/session_storage.json",
alias="SESSION_STATE_PATH",
)
login_username: str | None = Field(default=None, alias="LOGIN_USERNAME")
login_password: str | None = Field(default=None, alias="LOGIN_PASSWORD")
question_bank: str | None = Field(default=None, alias="QUESTION_BANK")
answer_count: int = Field(default=0, alias="ANSWER_COUNT")
answer_delay_ms: int = Field(default=0, alias="ANSWER_DELAY_MS")
+347 -5
View File
@@ -1,6 +1,12 @@
from __future__ import annotations
from auto_answer.browser import open_page
from difflib import SequenceMatcher
import json
import re
import time
from pathlib import Path
from auto_answer.browser import open_fresh_page, open_login_page, open_page_with_state
from auto_answer.config import Settings
from auto_answer.solver import Solver
@@ -10,13 +16,349 @@ class AnswerRunner:
self.settings = settings
self.solver = Solver()
def run(self) -> None:
if not self.settings.target_url:
raise SystemExit("Missing URL. Pass one argument or set TARGET_URL in .env.")
def login(self) -> None:
state_path = Path(self.settings.storage_state_path)
state_path.parent.mkdir(parents=True, exist_ok=True)
with open_page(self.settings) as page:
with open_login_page(self.settings) as (context, page):
page.goto(self.settings.login_url)
if self.settings.login_username and self.settings.login_password:
print("Edge opened. Filling login form automatically.")
self._submit_login(page)
self._wait_for_login(page)
context.storage_state(path=state_path)
self._save_session_storage(page)
print(f"Login state saved to {state_path}.")
return
print("Edge opened. Please log in, then close the Edge window.")
while not page.is_closed():
try:
context.storage_state(path=state_path)
self._save_session_storage(page)
state = self._get_page_state(page)
print(
"saved state: "
f"url={state['url']} "
f"localStorage={state['local_storage_count']} "
f"sessionStorage={state['session_storage_count']}"
)
except Exception:
break
time.sleep(2)
context.storage_state(path=state_path)
self._save_session_storage(page)
print(f"Login state saved to {state_path}.")
def _submit_login(self, page) -> None:
username = self.settings.login_username
password = self.settings.login_password
if not username or not password:
raise SystemExit("Missing LOGIN_USERNAME or LOGIN_PASSWORD.")
page.wait_for_load_state("domcontentloaded")
page.locator("input:not([type='password'])").first.fill(username)
page.locator("input[type='password']").first.fill(password)
login_button = page.get_by_role("button", name="登 录").first
if login_button.count() > 0:
login_button.click()
return
login_button = page.get_by_role("button", name="登录").first
if login_button.count() > 0:
login_button.click()
return
page.locator("button").filter(has_text="").first.click()
def _go_login(self, page) -> None:
relogin = page.get_by_role("button", name="重新登录")
if relogin.count() > 0:
relogin.first.click()
time.sleep(1)
if "/login" not in page.url:
page.goto(self.settings.login_url)
page.wait_for_load_state("domcontentloaded")
def _needs_login(self, page) -> bool:
if "/login" in page.url:
return True
try:
text = page.locator("body").inner_text(timeout=2000)
except Exception:
return False
return "登录状态已过期" in text or "重新登录" in text
def start(self) -> None:
with open_fresh_page(self.settings) as page:
page.goto(self.settings.login_url)
print("Edge opened. Please log in. The bot will continue in this same window.")
self._wait_for_login(page)
page.goto(self.settings.target_url)
page.wait_for_load_state("networkidle")
self._print_page_summary(page)
print("Keep using this Edge window. Close it when you want to stop the bot.")
page.wait_for_event("close", timeout=0)
def run(self) -> None:
with open_page_with_state(self.settings) as page:
page.goto(self.settings.target_url)
page.wait_for_load_state("networkidle")
self._print_page_summary(page)
question = page.title()
answer = self.solver.solve(question)
print(f"question: {question}")
print(f"answer: {answer}")
def practice(self) -> None:
state_path = Path(self.settings.storage_state_path)
state_path.parent.mkdir(parents=True, exist_ok=True)
with open_login_page(self.settings) as (context, page):
self._goto_target_page(page)
if self._needs_login(page):
print("Login required. Signing in...")
self._go_login(page)
self._submit_login(page)
self._wait_for_login(page)
context.storage_state(path=state_path)
self._save_session_storage(page)
self._goto_target_page(page)
else:
print("Already logged in.")
context.storage_state(path=state_path)
self._save_session_storage(page)
if self.settings.question_bank:
self._select_question_bank(page, self.settings.question_bank)
self._start_sequential_practice(page)
if self.settings.answer_count != 0:
self._answer_questions(page, self.settings.answer_count)
self._print_page_summary(page)
print("Practice page is ready. Close Edge when you are done.")
page.wait_for_event("close", timeout=0)
def open(self) -> None:
with open_page_with_state(self.settings) as page:
page.goto(self.settings.target_url)
page.wait_for_load_state("networkidle")
self._print_page_summary(page)
print("Page opened. Close Edge when you are done.")
page.wait_for_event("close", timeout=0)
def _select_question_bank(self, page, keyword: str) -> None:
self._wait_for_question_bank_tabs(page)
tabs = page.get_by_role("tab")
count = tabs.count()
if count == 0:
raise SystemExit("No question bank tabs found.")
names = [tabs.nth(index).inner_text().strip() for index in range(count)]
best_index, best_name, best_score = self._best_match(keyword, names)
print(
f"Question bank matched: keyword={keyword!r}, "
f"name={best_name!r}, score={best_score:.2f}"
)
tabs.nth(best_index).click()
page.wait_for_load_state("networkidle")
def _goto_target_page(self, page) -> None:
page.goto(self.settings.target_url)
page.wait_for_load_state("networkidle")
if self._needs_login(page):
return
self._wait_for_question_bank_tabs(page)
def _wait_for_question_bank_tabs(self, page) -> None:
page.wait_for_function(
"""() => {
const text = document.body.innerText;
const tabs = document.querySelectorAll('[role="tab"]');
return tabs.length > 0 || text.includes('顺序练习');
}""",
timeout=self.settings.timeout_ms,
)
def _best_match(self, keyword: str, names: list[str]) -> tuple[int, str, float]:
normalized_keyword = self._normalize_text(keyword)
best_index = 0
best_score = -1.0
for index, name in enumerate(names):
normalized_name = self._normalize_text(name)
if normalized_keyword in normalized_name:
score = 1.0 + len(normalized_keyword) / max(len(normalized_name), 1)
else:
score = SequenceMatcher(None, normalized_keyword, normalized_name).ratio()
if score > best_score:
best_index = index
best_score = score
return best_index, names[best_index], best_score
def _normalize_text(self, text: str) -> str:
return "".join(text.lower().split())
def _start_sequential_practice(self, page) -> None:
start = page.get_by_text("开始练习", exact=True).first
if start.count() == 0:
raise SystemExit("Sequential practice button not found.")
start.click()
page.wait_for_url("**/practice/topics**")
page.wait_for_load_state("networkidle")
def _answer_questions(self, page, count: int) -> None:
answered = 0
while count < 0 or answered < count:
answer = self._reveal_and_read_answer(page)
total = "until end" if count < 0 else str(count)
print(f"Answering question {answered + 1}/{total}: {answer}")
self._select_answer_options(page, answer)
if not self._go_next_question(page):
print("No next question detected. Stopping.")
return
answered += 1
if self.settings.answer_delay_ms > 0:
time.sleep(self.settings.answer_delay_ms / 1000)
def _reveal_and_read_answer(self, page) -> str:
body = page.locator("body")
text = body.inner_text(timeout=self.settings.timeout_ms)
if "正确答案:" not in text:
page.get_by_text("答案解析", exact=True).click()
page.wait_for_function(
"() => document.body.innerText.includes('正确答案:')",
timeout=self.settings.timeout_ms,
)
text = body.inner_text(timeout=self.settings.timeout_ms)
match = re.search(r"正确答案[:]\s*([A-Z,,、\s]+)", text)
if not match:
raise SystemExit("Correct answer not found after opening answer analysis.")
return "".join(re.findall(r"[A-Z]", match.group(1).upper()))
def _select_answer_options(self, page, answer: str) -> None:
if not answer:
raise SystemExit("Empty answer.")
for letter in answer:
option = page.locator("li").filter(has_text=re.compile(rf"^\s*{letter}\s*\."))
if option.count() == 0:
option = page.get_by_text(re.compile(rf"^\s*{letter}\s*\.")).locator("..")
if option.count() == 0:
raise SystemExit(f"Option not found: {letter}")
option.first.click()
time.sleep(0.2)
def _go_next_question(self, page) -> bool:
before = self._current_question_no(page)
next_button = page.get_by_text("下一题", exact=True)
if next_button.count() == 0:
return False
try:
next_button.click(timeout=3000)
page.wait_for_function(
r"""before => {
const text = document.body.innerText;
const match = text.match(/(\d+)\./);
return match && match[1] !== before;
}""",
arg=before,
timeout=self.settings.timeout_ms,
)
return True
except Exception:
return False
def _current_question_no(self, page) -> str:
text = page.locator("body").inner_text(timeout=self.settings.timeout_ms)
match = re.search(r"(\d+)\.", text)
return match.group(1) if match else ""
def _wait_for_login(self, page) -> None:
last_state = ""
while True:
state = self._get_page_state(page)
if state != last_state:
print(
"state: "
f"url={state['url']} "
f"title={state['title']} "
f"text={state['text']} "
f"localStorage={state['local_storage_count']} "
f"sessionStorage={state['session_storage_count']}"
)
last_state = state
if self._looks_logged_in(state):
page.wait_for_load_state("networkidle")
print("Login detected.")
return
time.sleep(2)
def _looks_logged_in(self, state: dict[str, str | int]) -> bool:
url = str(state["url"])
text = str(state["text"])
has_storage = (
int(state["local_storage_count"]) > 0
or int(state["session_storage_count"]) > 0
)
left_login_route = "/login" not in url
has_app_text = any(word in text for word in ("考试", "答题", "学习", "自主", "测试"))
return left_login_route or (has_storage and has_app_text)
def _get_page_state(self, page) -> dict[str, str | int]:
text = ""
try:
text = " ".join(page.locator("body").inner_text(timeout=1000).split())[:200]
except Exception:
pass
storage = page.evaluate(
"""() => ({
localStorageCount: localStorage.length,
sessionStorageCount: sessionStorage.length
})"""
)
return {
"url": page.url,
"title": page.title(),
"text": text,
"local_storage_count": storage["localStorageCount"],
"session_storage_count": storage["sessionStorageCount"],
}
def _save_session_storage(self, page) -> None:
state_path = Path(self.settings.session_state_path)
state_path.parent.mkdir(parents=True, exist_ok=True)
data = page.evaluate(
"""() => {
const values = {};
for (let index = 0; index < sessionStorage.length; index += 1) {
const key = sessionStorage.key(index);
values[key] = sessionStorage.getItem(key);
}
return { [location.origin]: values };
}"""
)
state_path.write_text(json.dumps(data, ensure_ascii=False, indent=2))
def _print_page_summary(self, page) -> None:
text = page.locator("body").inner_text(timeout=self.settings.timeout_ms)
summary = " ".join(text.split())[:1000]
print(f"url: {page.url}")
print(f"title: {page.title()}")
print(f"text: {summary}")
+4
View File
@@ -0,0 +1,4 @@
@echo off
cd /d "%~dp0"
uv run auto-answer practice --answer-count -1 --delay-ms 500
pause