Files
fast-xray/src/easy/state.rs
T
chuan 096e70fe54 feat: add a zero-install web UI for easy over an extracted engine
- extract the headless search out of `easy` into an engine that
  measures the baseline, resolves the target, and drives the
  producer/consumer loop, so the CLI and web share one search
- make `State` fully observable — structured discovery feed, run phase,
  baseline/target, and final outcome — and render the same snapshots two
  ways: the in-place terminal panel and an SSE stream
- serve a small embedded page (HTML/JS/Pico baked in via include_str!)
  on a hand-rolled hyper service: POST /api/run runs one search at a
  time, GET /api/events streams live snapshots as JSON until it settles
- open the UI on a bare invocation (double-clicked binary); `web` with
  --host/--port/--no-open controls it explicitly
- bump to 0.2.0
2026-06-26 10:51:23 +08:00

311 lines
9.5 KiB
Rust

//! Shared, observable state for an `easy` run: the validated queue, the confirm
//! queue, and the live testing/feed/recent views, all behind one [`State`]
//! handle whose methods are the *only* way to touch the bucket. A plain mutex
//! (never held across `.await`) is enough: producer and consumer drive it
//! through these methods; a renderer (the terminal panel) or the web SSE loop
//! reads a [`Snapshot`].
use std::collections::VecDeque;
use std::net::IpAddr;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use super::{FEED, RECENT};
/// The winning IP: address, carried latency, measured speed (MB/s).
pub(crate) struct Found {
pub(crate) ip: IpAddr,
pub(crate) latency: Duration,
pub(crate) mbs: f64,
}
/// Coarse run phase, surfaced to the web UI. `confirming`/`recalibrating` are
/// finer flags layered on top of `Searching`.
#[derive(Clone, Copy, PartialEq, Eq)]
pub(crate) enum Phase {
Measuring,
Searching,
Done,
}
/// Which discovery stage produced a feed line.
#[derive(Clone, Copy)]
pub(crate) enum FeedStage {
Ping,
Latency,
}
/// How a feed line turned out.
#[derive(Clone, Copy)]
pub(crate) enum FeedState {
Ok,
Slow,
Fail,
}
/// One structured discovery-feed line: the terminal panel formats it into ANSI,
/// the web layer serializes it to JSON — one source of truth for both.
#[derive(Clone)]
pub(crate) struct FeedEntry {
pub(crate) stage: FeedStage,
pub(crate) ip: IpAddr,
pub(crate) ms: Option<f64>,
pub(crate) state: FeedState,
pub(crate) note: Option<String>,
}
/// The terminal end of a run, surfaced to the web UI as the final SSE frame.
#[derive(Clone)]
pub(crate) enum Outcome {
Pending,
Found { ip: IpAddr, mbs: f64, latency_ms: f64, url: String },
NotFound,
}
/// A point-in-time copy of everything an observer draws.
pub(crate) struct Snapshot {
pub(crate) phase: Phase,
pub(crate) baseline: Option<f64>,
pub(crate) target: f64,
pub(crate) testing: Vec<IpAddr>,
pub(crate) feed: Vec<FeedEntry>,
pub(crate) recent: Vec<(IpAddr, Option<f64>, bool)>, // newest first
pub(crate) valid: usize,
pub(crate) queued: usize,
pub(crate) confirming: bool,
pub(crate) recalibrating: bool,
pub(crate) outcome: Outcome,
}
/// One finished speed test, kept for the "recent" panel.
struct Finished {
ip: IpAddr,
mbs: Option<f64>,
pass: bool,
}
struct Bucket {
queue: VecDeque<(IpAddr, Duration)>, // validated, awaiting a concurrent screen
confirm: VecDeque<(IpAddr, Duration)>, // promising (≥ target/n), awaiting a solo confirm
testing: Vec<IpAddr>, // IPs under test right now (screen or confirm)
feed: VecDeque<FeedEntry>, // last FEED discovery lines (producer's ping/latency)
recent: VecDeque<Finished>, // last RECENT finished, oldest first
valid: usize, // total minted (gates --max + header)
fails: usize, // tested-and-rejected count (drives recalibration)
phase: Phase, // coarse phase for the web UI
baseline: Option<f64>, // measured direct (no-proxy) speed, MB/s
target: f64, // current target MB/s (recalibrated mid-run)
confirming: bool, // a solo confirm is running (pauses screens, tints UI)
recalibrating: bool, // a direct re-measure is running
producer_done: bool,
found: bool,
outcome: Outcome, // final result, once the run settles
}
/// Cloneable handle to the shared bucket. All locking lives here.
#[derive(Clone)]
pub(crate) struct State(Arc<Mutex<Bucket>>);
impl State {
/// A fresh state. The target starts at 0 and is set once the baseline is
/// measured (see [`State::set_target`]); the run starts in `Measuring`.
pub(crate) fn new() -> Self {
State(Arc::new(Mutex::new(Bucket {
queue: VecDeque::new(),
confirm: VecDeque::new(),
testing: Vec::new(),
feed: VecDeque::new(),
recent: VecDeque::new(),
valid: 0,
fails: 0,
phase: Phase::Measuring,
baseline: None,
target: 0.0,
confirming: false,
recalibrating: false,
producer_done: false,
found: false,
outcome: Outcome::Pending,
})))
}
fn lock(&self) -> MutexGuard<'_, Bucket> {
self.0.lock().unwrap()
}
// --- reads ---
pub(super) fn target(&self) -> f64 {
self.lock().target
}
pub(crate) fn valid(&self) -> usize {
self.lock().valid
}
pub(super) fn confirm_pending(&self) -> bool {
!self.lock().confirm.is_empty()
}
/// True once the producer is finished and nothing remains to test.
pub(super) fn is_drained(&self) -> bool {
let b = self.lock();
b.producer_done && b.queue.is_empty() && b.confirm.is_empty()
}
/// Whether a direct-link recalibration is due (every `every` failures).
pub(super) fn recalibration_due(&self, every: usize) -> bool {
let b = self.lock();
b.fails > 0 && b.fails % every == 0 && !b.found
}
// --- phase / baseline / target (driven by the engine) ---
pub(crate) fn set_phase(&self, phase: Phase) {
self.lock().phase = phase;
}
pub(crate) fn set_baseline(&self, baseline: Option<f64>) {
self.lock().baseline = baseline;
}
pub(crate) fn set_target(&self, target: f64) {
self.lock().target = target;
}
/// Record the winning node and settle the run (phase `Done`).
pub(crate) fn finish_found(&self, ip: IpAddr, mbs: f64, latency_ms: f64, url: String) {
let mut b = self.lock();
b.phase = Phase::Done;
b.outcome = Outcome::Found { ip, mbs, latency_ms, url };
}
/// Settle the run with no fast-enough IP found (phase `Done`).
pub(crate) fn finish_not_found(&self) {
let mut b = self.lock();
b.phase = Phase::Done;
b.outcome = Outcome::NotFound;
}
// --- consumer ---
pub(super) fn pop_queue(&self) -> Option<(IpAddr, Duration)> {
self.lock().queue.pop_front()
}
pub(super) fn pop_confirm(&self) -> Option<(IpAddr, Duration)> {
self.lock().confirm.pop_front()
}
pub(super) fn push_confirm(&self, ip: IpAddr, lat: Duration) {
self.lock().confirm.push_back((ip, lat));
}
pub(super) fn set_testing(&self, ips: &[IpAddr]) {
self.lock().testing = ips.to_vec();
}
pub(super) fn begin_confirm(&self) {
self.lock().confirming = true;
}
pub(super) fn end_confirm(&self) {
let mut b = self.lock();
b.confirming = false;
b.testing.clear();
}
/// Mark the search won and record the winner in the recent panel.
pub(super) fn mark_found(&self, ip: IpAddr, mbs: f64) {
let mut b = self.lock();
b.found = true;
push_recent(&mut b, ip, Some(mbs), true);
b.testing.clear();
}
/// Record a rejected IP and bump the failure counter.
pub(super) fn record_fail(&self, ip: IpAddr, mbs: Option<f64>) {
let mut b = self.lock();
b.fails += 1;
push_recent(&mut b, ip, mbs, false);
}
pub(super) fn begin_recalibrate(&self) {
let mut b = self.lock();
b.recalibrating = true;
b.testing.clear();
}
pub(super) fn end_recalibrate(&self, new_target: Option<f64>) {
let mut b = self.lock();
b.recalibrating = false;
if let Some(t) = new_target {
b.target = t;
}
}
// --- producer ---
/// Stop signal plus a one-lock read of `(queue_len, valid)`; `None` means the
/// producer should stop (found, or the valid cap is reached).
pub(super) fn refill_check(&self, max_valid: usize) -> Option<(usize, usize)> {
let b = self.lock();
if b.found || b.valid >= max_valid {
None
} else {
Some((b.queue.len(), b.valid))
}
}
/// Enqueue a validated IP unless we've already stopped; returns whether it
/// was minted.
pub(super) fn try_enqueue(&self, ip: IpAddr, lat: Duration, max_valid: usize) -> bool {
let mut b = self.lock();
if b.found || b.valid >= max_valid {
return false;
}
b.queue.push_back((ip, lat));
b.valid += 1;
true
}
/// Push one entry onto the bounded discovery feed (newest last).
pub(super) fn push_feed(&self, entry: FeedEntry) {
let mut b = self.lock();
b.feed.push_back(entry);
while b.feed.len() > FEED {
b.feed.pop_front();
}
}
pub(super) fn set_producer_done(&self) {
self.lock().producer_done = true;
}
// --- observers (terminal panel / web SSE) ---
pub(crate) fn snapshot(&self) -> Snapshot {
let b = self.lock();
Snapshot {
phase: b.phase,
baseline: b.baseline,
target: b.target,
testing: b.testing.clone(),
feed: b.feed.iter().cloned().collect(),
recent: b.recent.iter().rev().map(|f| (f.ip, f.mbs, f.pass)).collect(),
valid: b.valid,
queued: b.queue.len(),
confirming: b.confirming,
recalibrating: b.recalibrating,
outcome: b.outcome.clone(),
}
}
}
fn push_recent(b: &mut Bucket, ip: IpAddr, mbs: Option<f64>, pass: bool) {
b.recent.push_back(Finished { ip, mbs, pass });
while b.recent.len() > RECENT {
b.recent.pop_front();
}
}