diff --git a/.env.example b/.env.example index ba0f868..fa86b9b 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,13 @@ -TARGET_URL=https://example.com +LOGIN_URL=https://sia.sinopec.com/mobile/#/login +TARGET_URL=https://sia.sinopec.com/mobile/#/app/autonomous/test/index?fromName=index HEADLESS=false TIMEOUT_MS=30000 +BROWSER_CHANNEL=msedge +USER_DATA_DIR=.auth/edge +STORAGE_STATE_PATH=.auth/storage_state.json +SESSION_STATE_PATH=.auth/session_storage.json +LOGIN_USERNAME= +LOGIN_PASSWORD= +QUESTION_BANK=测录融合 +ANSWER_COUNT=0 +ANSWER_DELAY_MS=0 diff --git a/.gitignore b/.gitignore index 505a3b1..440a0be 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,9 @@ wheels/ # Virtual environments .venv +.env + +# Local browser login state +.auth/ + +.playwright-mcp/ \ No newline at end of file diff --git a/README.md b/README.md index cb8f9c1..4bd20cd 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,38 @@ uv run playwright install chromium ## 运行 ```powershell -uv run auto-answer https://example.com +uv run auto-answer login +uv run auto-answer run +``` + +打开自主练测页,如果登录过期则自动登录: + +```powershell +uv run auto-answer practice +``` + +指定题库关键词,程序会自动选择匹配度最高的题库 tab,并进入顺序练习: + +```powershell +uv run auto-answer practice --bank "测录融合" +``` + +进入顺序练习后自动答指定数量的题: + +```powershell +uv run auto-answer practice --bank "测录融合" --answer-count 10 +``` + +一直答到没有下一题,并在每题后等待 500ms: + +```powershell +uv run auto-answer practice --bank "测录融合" --answer-count -1 --delay-ms 500 +``` + +连续登录并复用同一个 Edge 窗口: + +```powershell +uv run auto-answer start ``` 也可以复制 `.env.example` 为 `.env`,然后直接运行: diff --git a/src/auto_answer/browser.py b/src/auto_answer/browser.py index 93529d8..b3c63b6 100644 --- a/src/auto_answer/browser.py +++ b/src/auto_answer/browser.py @@ -2,8 +2,10 @@ from __future__ import annotations from collections.abc import Iterator from contextlib import contextmanager +import json +from pathlib import Path -from playwright.sync_api import Browser, Page, sync_playwright +from playwright.sync_api import Browser, BrowserContext, Page, sync_playwright from auto_answer.config import Settings @@ -11,10 +13,89 @@ from auto_answer.config import Settings @contextmanager def open_page(settings: Settings) -> Iterator[Page]: with sync_playwright() as playwright: - browser: Browser = playwright.chromium.launch(headless=settings.headless) + context: BrowserContext = playwright.chromium.launch_persistent_context( + user_data_dir=Path(settings.user_data_dir), + channel=settings.browser_channel, + headless=settings.headless, + ) + page = context.pages[0] if context.pages else context.new_page() + page.set_default_timeout(settings.timeout_ms) + try: + yield page + finally: + context.close() + + +@contextmanager +def open_fresh_page(settings: Settings) -> Iterator[Page]: + with sync_playwright() as playwright: + browser: Browser = playwright.chromium.launch( + channel=settings.browser_channel, + headless=settings.headless, + ) page = browser.new_page() page.set_default_timeout(settings.timeout_ms) try: yield page finally: browser.close() + + +@contextmanager +def open_login_page(settings: Settings) -> Iterator[tuple[BrowserContext, Page]]: + with sync_playwright() as playwright: + browser: Browser = playwright.chromium.launch( + channel=settings.browser_channel, + headless=False, + ) + context_options = {} + storage_state_path = Path(settings.storage_state_path) + if storage_state_path.exists(): + context_options["storage_state"] = storage_state_path + + context = browser.new_context(**context_options) + session_state_path = Path(settings.session_state_path) + if session_state_path.exists(): + session_data = json.dumps(json.loads(session_state_path.read_text())) + context.add_init_script( + script=f""" + (() => {{ + const data = {session_data}; + for (const [origin, values] of Object.entries(data)) {{ + if (location.origin !== origin) continue; + for (const [key, value] of Object.entries(values)) {{ + sessionStorage.setItem(key, value); + }} + }} + }})(); + """ + ) + + page = context.new_page() + page.set_default_timeout(settings.timeout_ms) + try: + yield context, page + finally: + browser.close() + + +@contextmanager +def open_page_with_state(settings: Settings) -> Iterator[Page]: + state_path = Path(settings.storage_state_path) + if not state_path.exists(): + raise SystemExit( + f"Missing login state: {state_path}. Run `uv run auto-answer login` first." + ) + + with sync_playwright() as playwright: + browser: Browser = playwright.chromium.launch( + channel=settings.browser_channel, + headless=settings.headless, + ) + context = browser.new_context(storage_state=state_path) + page = context.new_page() + page.set_default_timeout(settings.timeout_ms) + try: + yield page + finally: + browser.close() diff --git a/src/auto_answer/cli.py b/src/auto_answer/cli.py index 260adf1..b1cc228 100644 --- a/src/auto_answer/cli.py +++ b/src/auto_answer/cli.py @@ -6,26 +6,78 @@ from auto_answer.config import Settings from auto_answer.runner import AnswerRunner +def build_settings(args: argparse.Namespace) -> Settings: + values: dict[str, object] = {"headless": args.headless} + if args.url and args.command == "login": + values["login_url"] = args.url + elif args.url: + values["target_url"] = args.url + if getattr(args, "bank", None): + values["question_bank"] = args.bank + if getattr(args, "answer_count", None) is not None: + values["answer_count"] = args.answer_count + if getattr(args, "delay_ms", None) is not None: + values["answer_delay_ms"] = args.delay_ms + return Settings(**values) + + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Run the auto-answer bot.") - parser.add_argument( - "url", - nargs="?", - help="Target page URL. Falls back to TARGET_URL from .env.", - ) parser.add_argument( "--headless", action="store_true", help="Run browser in headless mode.", ) + + subparsers = parser.add_subparsers(dest="command", required=True) + + login_parser = subparsers.add_parser("login", help="Open Edge and save login state.") + login_parser.add_argument("url", nargs="?", help="Target page URL.") + + start_parser = subparsers.add_parser("start", help="Log in and continue in one Edge session.") + start_parser.add_argument("url", nargs="?", help="Target page URL.") + + run_parser = subparsers.add_parser("run", help="Run the bot with saved login state.") + run_parser.add_argument("url", nargs="?", help="Target page URL.") + + practice_parser = subparsers.add_parser( + "practice", + help="Open practice page, auto-login if needed, and wait.", + ) + practice_parser.add_argument("url", nargs="?", help="Target page URL.") + practice_parser.add_argument("--bank", help="Question bank keyword.") + practice_parser.add_argument( + "--answer-count", + type=int, + help="Number of questions to answer automatically. Use -1 to answer until the end.", + ) + practice_parser.add_argument( + "--delay-ms", + type=int, + help="Delay after each answered question, in milliseconds.", + ) + + open_parser = subparsers.add_parser("open", help="Open the target page and wait.") + open_parser.add_argument("url", nargs="?", help="Target page URL.") + return parser def main() -> None: args = build_parser().parse_args() - settings = Settings(target_url=args.url, headless=args.headless) + settings = build_settings(args) runner = AnswerRunner(settings) - runner.run() + + if args.command == "login": + runner.login() + elif args.command == "start": + runner.start() + elif args.command == "run": + runner.run() + elif args.command == "practice": + runner.practice() + elif args.command == "open": + runner.open() if __name__ == "__main__": diff --git a/src/auto_answer/config.py b/src/auto_answer/config.py index 118dd2c..1d9b401 100644 --- a/src/auto_answer/config.py +++ b/src/auto_answer/config.py @@ -3,6 +3,11 @@ from __future__ import annotations from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict +DEFAULT_LOGIN_URL = "https://sia.sinopec.com/mobile/#/login" +DEFAULT_TARGET_URL = ( + "https://sia.sinopec.com/mobile/#/app/autonomous/test/index?fromName=index" +) + class Settings(BaseSettings): model_config = SettingsConfigDict( @@ -12,6 +17,22 @@ class Settings(BaseSettings): populate_by_name=True, ) - target_url: str | None = Field(default=None, alias="TARGET_URL") + login_url: str = Field(default=DEFAULT_LOGIN_URL, alias="LOGIN_URL") + target_url: str = Field(default=DEFAULT_TARGET_URL, alias="TARGET_URL") headless: bool = Field(default=False, alias="HEADLESS") timeout_ms: int = Field(default=30_000, alias="TIMEOUT_MS") + browser_channel: str = Field(default="msedge", alias="BROWSER_CHANNEL") + user_data_dir: str = Field(default=".auth/edge", alias="USER_DATA_DIR") + storage_state_path: str = Field( + default=".auth/storage_state.json", + alias="STORAGE_STATE_PATH", + ) + session_state_path: str = Field( + default=".auth/session_storage.json", + alias="SESSION_STATE_PATH", + ) + login_username: str | None = Field(default=None, alias="LOGIN_USERNAME") + login_password: str | None = Field(default=None, alias="LOGIN_PASSWORD") + question_bank: str | None = Field(default=None, alias="QUESTION_BANK") + answer_count: int = Field(default=0, alias="ANSWER_COUNT") + answer_delay_ms: int = Field(default=0, alias="ANSWER_DELAY_MS") diff --git a/src/auto_answer/runner.py b/src/auto_answer/runner.py index 82926b7..706a3f2 100644 --- a/src/auto_answer/runner.py +++ b/src/auto_answer/runner.py @@ -1,6 +1,12 @@ from __future__ import annotations -from auto_answer.browser import open_page +from difflib import SequenceMatcher +import json +import re +import time +from pathlib import Path + +from auto_answer.browser import open_fresh_page, open_login_page, open_page_with_state from auto_answer.config import Settings from auto_answer.solver import Solver @@ -10,13 +16,349 @@ class AnswerRunner: self.settings = settings self.solver = Solver() - def run(self) -> None: - if not self.settings.target_url: - raise SystemExit("Missing URL. Pass one argument or set TARGET_URL in .env.") + def login(self) -> None: + state_path = Path(self.settings.storage_state_path) + state_path.parent.mkdir(parents=True, exist_ok=True) - with open_page(self.settings) as page: + with open_login_page(self.settings) as (context, page): + page.goto(self.settings.login_url) + if self.settings.login_username and self.settings.login_password: + print("Edge opened. Filling login form automatically.") + self._submit_login(page) + self._wait_for_login(page) + context.storage_state(path=state_path) + self._save_session_storage(page) + print(f"Login state saved to {state_path}.") + return + + print("Edge opened. Please log in, then close the Edge window.") + while not page.is_closed(): + try: + context.storage_state(path=state_path) + self._save_session_storage(page) + state = self._get_page_state(page) + print( + "saved state: " + f"url={state['url']} " + f"localStorage={state['local_storage_count']} " + f"sessionStorage={state['session_storage_count']}" + ) + except Exception: + break + time.sleep(2) + context.storage_state(path=state_path) + self._save_session_storage(page) + print(f"Login state saved to {state_path}.") + + def _submit_login(self, page) -> None: + username = self.settings.login_username + password = self.settings.login_password + if not username or not password: + raise SystemExit("Missing LOGIN_USERNAME or LOGIN_PASSWORD.") + + page.wait_for_load_state("domcontentloaded") + page.locator("input:not([type='password'])").first.fill(username) + page.locator("input[type='password']").first.fill(password) + + login_button = page.get_by_role("button", name="登 录").first + if login_button.count() > 0: + login_button.click() + return + + login_button = page.get_by_role("button", name="登录").first + if login_button.count() > 0: + login_button.click() + return + + page.locator("button").filter(has_text="登").first.click() + + def _go_login(self, page) -> None: + relogin = page.get_by_role("button", name="重新登录") + if relogin.count() > 0: + relogin.first.click() + time.sleep(1) + + if "/login" not in page.url: + page.goto(self.settings.login_url) + + page.wait_for_load_state("domcontentloaded") + + def _needs_login(self, page) -> bool: + if "/login" in page.url: + return True + + try: + text = page.locator("body").inner_text(timeout=2000) + except Exception: + return False + + return "登录状态已过期" in text or "重新登录" in text + + def start(self) -> None: + with open_fresh_page(self.settings) as page: + page.goto(self.settings.login_url) + print("Edge opened. Please log in. The bot will continue in this same window.") + self._wait_for_login(page) page.goto(self.settings.target_url) + page.wait_for_load_state("networkidle") + self._print_page_summary(page) + print("Keep using this Edge window. Close it when you want to stop the bot.") + page.wait_for_event("close", timeout=0) + + def run(self) -> None: + with open_page_with_state(self.settings) as page: + page.goto(self.settings.target_url) + page.wait_for_load_state("networkidle") + self._print_page_summary(page) question = page.title() answer = self.solver.solve(question) print(f"question: {question}") print(f"answer: {answer}") + + def practice(self) -> None: + state_path = Path(self.settings.storage_state_path) + state_path.parent.mkdir(parents=True, exist_ok=True) + + with open_login_page(self.settings) as (context, page): + self._goto_target_page(page) + + if self._needs_login(page): + print("Login required. Signing in...") + self._go_login(page) + self._submit_login(page) + self._wait_for_login(page) + context.storage_state(path=state_path) + self._save_session_storage(page) + self._goto_target_page(page) + else: + print("Already logged in.") + context.storage_state(path=state_path) + self._save_session_storage(page) + + if self.settings.question_bank: + self._select_question_bank(page, self.settings.question_bank) + + self._start_sequential_practice(page) + if self.settings.answer_count != 0: + self._answer_questions(page, self.settings.answer_count) + + self._print_page_summary(page) + print("Practice page is ready. Close Edge when you are done.") + page.wait_for_event("close", timeout=0) + + def open(self) -> None: + with open_page_with_state(self.settings) as page: + page.goto(self.settings.target_url) + page.wait_for_load_state("networkidle") + self._print_page_summary(page) + print("Page opened. Close Edge when you are done.") + page.wait_for_event("close", timeout=0) + + def _select_question_bank(self, page, keyword: str) -> None: + self._wait_for_question_bank_tabs(page) + tabs = page.get_by_role("tab") + count = tabs.count() + if count == 0: + raise SystemExit("No question bank tabs found.") + + names = [tabs.nth(index).inner_text().strip() for index in range(count)] + best_index, best_name, best_score = self._best_match(keyword, names) + print( + f"Question bank matched: keyword={keyword!r}, " + f"name={best_name!r}, score={best_score:.2f}" + ) + tabs.nth(best_index).click() + page.wait_for_load_state("networkidle") + + def _goto_target_page(self, page) -> None: + page.goto(self.settings.target_url) + page.wait_for_load_state("networkidle") + if self._needs_login(page): + return + self._wait_for_question_bank_tabs(page) + + def _wait_for_question_bank_tabs(self, page) -> None: + page.wait_for_function( + """() => { + const text = document.body.innerText; + const tabs = document.querySelectorAll('[role="tab"]'); + return tabs.length > 0 || text.includes('顺序练习'); + }""", + timeout=self.settings.timeout_ms, + ) + + def _best_match(self, keyword: str, names: list[str]) -> tuple[int, str, float]: + normalized_keyword = self._normalize_text(keyword) + best_index = 0 + best_score = -1.0 + + for index, name in enumerate(names): + normalized_name = self._normalize_text(name) + if normalized_keyword in normalized_name: + score = 1.0 + len(normalized_keyword) / max(len(normalized_name), 1) + else: + score = SequenceMatcher(None, normalized_keyword, normalized_name).ratio() + + if score > best_score: + best_index = index + best_score = score + + return best_index, names[best_index], best_score + + def _normalize_text(self, text: str) -> str: + return "".join(text.lower().split()) + + def _start_sequential_practice(self, page) -> None: + start = page.get_by_text("开始练习", exact=True).first + if start.count() == 0: + raise SystemExit("Sequential practice button not found.") + + start.click() + page.wait_for_url("**/practice/topics**") + page.wait_for_load_state("networkidle") + + def _answer_questions(self, page, count: int) -> None: + answered = 0 + while count < 0 or answered < count: + answer = self._reveal_and_read_answer(page) + total = "until end" if count < 0 else str(count) + print(f"Answering question {answered + 1}/{total}: {answer}") + self._select_answer_options(page, answer) + if not self._go_next_question(page): + print("No next question detected. Stopping.") + return + answered += 1 + if self.settings.answer_delay_ms > 0: + time.sleep(self.settings.answer_delay_ms / 1000) + + def _reveal_and_read_answer(self, page) -> str: + body = page.locator("body") + text = body.inner_text(timeout=self.settings.timeout_ms) + if "正确答案:" not in text: + page.get_by_text("答案解析", exact=True).click() + page.wait_for_function( + "() => document.body.innerText.includes('正确答案:')", + timeout=self.settings.timeout_ms, + ) + text = body.inner_text(timeout=self.settings.timeout_ms) + + match = re.search(r"正确答案[::]\s*([A-Z,,、\s]+)", text) + if not match: + raise SystemExit("Correct answer not found after opening answer analysis.") + + return "".join(re.findall(r"[A-Z]", match.group(1).upper())) + + def _select_answer_options(self, page, answer: str) -> None: + if not answer: + raise SystemExit("Empty answer.") + + for letter in answer: + option = page.locator("li").filter(has_text=re.compile(rf"^\s*{letter}\s*\.")) + if option.count() == 0: + option = page.get_by_text(re.compile(rf"^\s*{letter}\s*\.")).locator("..") + if option.count() == 0: + raise SystemExit(f"Option not found: {letter}") + option.first.click() + time.sleep(0.2) + + def _go_next_question(self, page) -> bool: + before = self._current_question_no(page) + next_button = page.get_by_text("下一题", exact=True) + if next_button.count() == 0: + return False + + try: + next_button.click(timeout=3000) + page.wait_for_function( + r"""before => { + const text = document.body.innerText; + const match = text.match(/(\d+)\./); + return match && match[1] !== before; + }""", + arg=before, + timeout=self.settings.timeout_ms, + ) + return True + except Exception: + return False + + def _current_question_no(self, page) -> str: + text = page.locator("body").inner_text(timeout=self.settings.timeout_ms) + match = re.search(r"(\d+)\.", text) + return match.group(1) if match else "" + + def _wait_for_login(self, page) -> None: + last_state = "" + while True: + state = self._get_page_state(page) + if state != last_state: + print( + "state: " + f"url={state['url']} " + f"title={state['title']} " + f"text={state['text']} " + f"localStorage={state['local_storage_count']} " + f"sessionStorage={state['session_storage_count']}" + ) + last_state = state + + if self._looks_logged_in(state): + page.wait_for_load_state("networkidle") + print("Login detected.") + return + + time.sleep(2) + + def _looks_logged_in(self, state: dict[str, str | int]) -> bool: + url = str(state["url"]) + text = str(state["text"]) + has_storage = ( + int(state["local_storage_count"]) > 0 + or int(state["session_storage_count"]) > 0 + ) + left_login_route = "/login" not in url + has_app_text = any(word in text for word in ("考试", "答题", "学习", "自主", "测试")) + return left_login_route or (has_storage and has_app_text) + + def _get_page_state(self, page) -> dict[str, str | int]: + text = "" + try: + text = " ".join(page.locator("body").inner_text(timeout=1000).split())[:200] + except Exception: + pass + + storage = page.evaluate( + """() => ({ + localStorageCount: localStorage.length, + sessionStorageCount: sessionStorage.length + })""" + ) + return { + "url": page.url, + "title": page.title(), + "text": text, + "local_storage_count": storage["localStorageCount"], + "session_storage_count": storage["sessionStorageCount"], + } + + def _save_session_storage(self, page) -> None: + state_path = Path(self.settings.session_state_path) + state_path.parent.mkdir(parents=True, exist_ok=True) + data = page.evaluate( + """() => { + const values = {}; + for (let index = 0; index < sessionStorage.length; index += 1) { + const key = sessionStorage.key(index); + values[key] = sessionStorage.getItem(key); + } + return { [location.origin]: values }; + }""" + ) + state_path.write_text(json.dumps(data, ensure_ascii=False, indent=2)) + + def _print_page_summary(self, page) -> None: + text = page.locator("body").inner_text(timeout=self.settings.timeout_ms) + summary = " ".join(text.split())[:1000] + print(f"url: {page.url}") + print(f"title: {page.title()}") + print(f"text: {summary}") diff --git a/start.bat b/start.bat new file mode 100644 index 0000000..a5141eb --- /dev/null +++ b/start.bat @@ -0,0 +1,4 @@ +@echo off +cd /d "%~dp0" +uv run auto-answer practice --answer-count -1 --delay-ms 500 +pause