From 9de35210b757f4d20c2a71157cbd434894e4d364 Mon Sep 17 00:00:00 2001 From: chuan Date: Fri, 26 Jun 2026 01:51:23 +0800 Subject: [PATCH] feat: rework easy with pipelined discovery, solo-confirm, and a live panel --- README.md | 6 +- src/easy.rs | 467 ------------------------------------------- src/easy/consumer.rs | 139 +++++++++++++ src/easy/mod.rs | 172 ++++++++++++++++ src/easy/producer.rs | 131 ++++++++++++ src/easy/state.rs | 223 +++++++++++++++++++++ src/easy/ui.rs | 189 +++++++++++++++++ 7 files changed, 857 insertions(+), 470 deletions(-) delete mode 100644 src/easy.rs create mode 100644 src/easy/consumer.rs create mode 100644 src/easy/mod.rs create mode 100644 src/easy/producer.rs create mode 100644 src/easy/state.rs create mode 100644 src/easy/ui.rs diff --git a/README.md b/README.md index d8a69c9..e1d1476 100644 --- a/README.md +++ b/README.md @@ -112,9 +112,9 @@ auto 使用各阶段默认的超时、并发、丢包等参数;需精调时单 ### easy — `fast-xray easy [NODE] --speed [OPTIONS]` -只要一个够快的 IP 就停。后台持续用 `ping + latency` 往池子(桶)里补充有效 IP;前台每轮并发筛一批(5 个),并发下就达标的直接命中,有潜力的(≥ `目标/5`)再单独确认,第一个达到 `--speed` 的即为结果,直接输出可复制的节点。 +只要一个够快的 IP 就停。后台持续用 `ping + latency` 补充有效 IP 队列(低于 30 时补到 50);前台用 5 个独立 worker 并发筛,谁测完谁立刻取下一个,互不等待。并发下直接达到 `--speed` 的直接命中;有潜力的(≥ `目标/5`)记入确认队列,等手头这批并发跑完后再**逐个单独(无并发)**测速确认真实速度,第一个达标的即为结果,直接输出可复制的节点。 -先直连国内镜像(清华 TUNA)测一次本机不走代理的下载速度,作为带宽上限:`--speed` 高于它直接判定不可能;镜像偶发不可用则跳过、照常搜索。 +先直连国内镜像(清华 TUNA)做一次 8 秒的不走代理下载测速,作为带宽上限:`--speed` 高于它直接判定不可能;镜像偶发不可用则跳过、照常搜索。搜索中每累计 100 次失败会重测一次直连速度,本机链路变慢时按新值下调默认目标(显式 `--speed` 则不动)。 | 参数 | 默认 | 说明 | | --- | --- | --- | @@ -124,7 +124,7 @@ auto 使用各阶段默认的超时、并发、丢包等参数;需精调时单 | `-6, --ipv6` | off | 同时搜索 IPv6 | | `-o, --output` | result | 输出目录(写 result.txt) | -固定参数:ping 并发 100、≤200ms;latency 并发 10、≤200ms;测速并发筛 5 个再单独确认、每次上限 5s。 +固定参数:ping 并发 100、≤200ms;latency 并发 10、≤200ms;测速 5 路并发筛、有潜力的单独确认、每次上限 5s;有效队列水位 30/50。 ### ping — `fast-xray ping [OPTIONS]` diff --git a/src/easy.rs b/src/easy.rs deleted file mode 100644 index 2bade99..0000000 --- a/src/easy.rs +++ /dev/null @@ -1,467 +0,0 @@ -//! `easy`: find one IP fast enough to hit a target speed, then stop. -//! -//! A producer keeps a bounded *bucket* of validated IPs (those that pass ping + -//! latency) topped up. A consumer pulls a batch and screens it concurrently: -//! under n-way contention each stream gets roughly link/n, so anything reaching -//! `target/n` is worth a solo confirm and anything already at `target` is a -//! guaranteed pass. The promising ones are then re-tested single-threaded (for -//! an accurate number) until one clears `target`. A live "bucket / recent" view -//! shows what's queued, what's being tested, and the last few finished results. - -use std::collections::VecDeque; -use std::net::IpAddr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; -use std::time::Duration; - -use anyhow::{Result, anyhow}; -use console::style; -use futures::future::join_all; -use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; - -use crate::cli::EasyArgs; -use crate::cloudflare::{self, CfRanges, Family}; -use crate::latency; -use crate::ping; -use crate::report::{print_easy_found, resolve_node, write_file}; -use crate::speed; -use crate::vless::VlessNode; - -// Fixed knobs — the point of `easy` is to not expose these. -const POOL: usize = 10; // bucket capacity (testing + waiting), also the live row count -const BATCH: usize = 5; // IPs screened concurrently per round -const RECENT: usize = 5; // finished results kept on screen for review -const PING_CONCURRENCY: usize = 100; -const PING_TIMEOUT: f64 = 3.0; -const PING_MIN_MS: f64 = 10.0; -const PING_MAX_MS: f64 = 200.0; -const LAT_CONCURRENCY: usize = 10; -const LAT_TIMEOUT: f64 = 5.0; -const LAT_MAX_MS: f64 = 200.0; -const SPEED_TIMEOUT: f64 = 5.0; -const SPEED_BYTES: u64 = 50_000_000; // file is large; time/steady-state caps first -const MAX_BARREN_ROUNDS: usize = 8; // give up after this many empty discovery rounds -const DEFAULT_SPEED_FRACTION: f64 = 0.80; // default target = this × direct speed -const POLL: Duration = Duration::from_millis(50); // producer/consumer idle poll -const FRAME: Duration = Duration::from_millis(120); // render tick -const FRAMES: [&str; 10] = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; - -/// One finished speed test, kept for the "recent" panel. -struct Finished { - ip: IpAddr, - mbs: Option, - pass: bool, -} - -/// Shared, observable bucket. A plain mutex (never held across `.await`) is -/// enough: producer and consumer poll it, the renderer reads snapshots. -struct Bucket { - waiting: VecDeque<(IpAddr, Duration)>, // validated, not yet speed-tested - testing: Vec, // the batch being screened / confirmed - recent: VecDeque, // last RECENT finished, oldest first - valid: usize, // total minted (gates --max + header) - producer_done: bool, - found: bool, -} - -type Shared = Arc>; - -pub(crate) async fn run(args: EasyArgs) -> Result<()> { - let node = Arc::new(resolve_node(&args.node_file, &args.node)?); - if matches!(args.speed, Some(s) if s <= 0.0) { - return Err(anyhow!("--speed must be greater than 0")); - } - - // 0. Direct (no-proxy) speed via a domestic mirror — the link ceiling, and - // the source of the default target when --speed is omitted. - let spinner = spin("Measuring direct (no-proxy) download speed…"); - let baseline = speed::measure_direct(secs(SPEED_TIMEOUT), SPEED_BYTES).await; - match &baseline { - Ok(mbs) => spinner.finish_with_message(format!( - "{} direct download speed: {} MB/s", - style("✓").green().bold(), - style(format!("{mbs:.2}")).cyan() - )), - Err(_) => spinner.finish_with_message(format!( - "{} direct speed unavailable (direct access blocked?)", - style("!").yellow().bold() - )), - } - - // Resolve the target: explicit --speed, else a fraction of the measured - // direct speed. Reject a target the local link can't reach; with neither a - // target nor a measured link there's nothing to aim for. - let (target, note) = match (args.speed, baseline) { - (Some(s), Ok(base)) if s > base => { - return Err(anyhow!( - "direct speed is only {base:.2} MB/s — can't find a node ≥ {s:.2} MB/s" - )); - } - (Some(s), _) => (s, String::new()), - (None, Ok(base)) => ( - base * DEFAULT_SPEED_FRACTION, - format!(" ({:.0}% of direct {base:.2})", DEFAULT_SPEED_FRACTION * 100.0), - ), - (None, Err(_)) => { - return Err(anyhow!( - "couldn't measure direct speed to pick a default target — pass --speed " - )); - } - }; - - let family = if args.ipv6 { Family::Both } else { Family::V4 }; - let spinner = spin("Fetching Cloudflare ranges…"); - let ranges = Arc::new(cloudflare::fetch_ranges(family).await?); - spinner.finish_with_message(format!( - "{} Cloudflare ranges: {} v4, {} v6", - style("✓").green().bold(), - ranges.v4.len(), - ranges.v6.len() - )); - eprintln!( - "{} target ≥ {} MB/s{}, up to {} valid IPs", - style("easy").bold().cyan(), - style(format!("{target:.2}")).cyan(), - style(note).dim(), - args.max - ); - - let shared: Shared = Arc::new(Mutex::new(Bucket { - waiting: VecDeque::new(), - testing: Vec::new(), - recent: VecDeque::new(), - valid: 0, - producer_done: false, - found: false, - })); - let producer = tokio::spawn(produce(ranges.clone(), node.clone(), args.max, args.ipv6, shared.clone())); - let stop = Arc::new(AtomicBool::new(false)); - let ui = tokio::spawn(render_loop(shared.clone(), BucketView::new(), target, args.max, stop.clone())); - - // Consumer: pull a batch, screen it concurrently, then confirm the - // promising ones single-threaded (for an accurate number). - let mut hit: Option<(IpAddr, Duration, f64)> = None; - let mut tested = 0usize; - // What the locked acquire step decided — kept tiny so the guard is released - // before any await. - enum Step { - Stop, - Wait, - Go(Vec<(IpAddr, Duration)>), - } - 'consume: loop { - let step = { - let mut b = shared.lock().unwrap(); - let n = b.waiting.len().min(BATCH); - if b.found || (n == 0 && b.producer_done) { - Step::Stop - } else if n == 0 { - Step::Wait - } else { - let batch: Vec<_> = b.waiting.drain(..n).collect(); - b.testing = batch.iter().map(|(ip, _)| *ip).collect(); - Step::Go(batch) - } - }; - let batch = match step { - Step::Stop => break, - Step::Wait => { - tokio::time::sleep(POLL).await; - continue; - } - Step::Go(batch) => batch, - }; - let n = batch.len(); - tested += n; - - // Screen the whole batch at once. - let screened = join_all(batch.into_iter().map(|(ip, lat)| { - let node = node.clone(); - async move { - let r = - speed::measure_download(node.as_ref(), ip, secs(SPEED_TIMEOUT), SPEED_BYTES).await; - (ip, lat, r) - } - })) - .await; - - // Anything at target/n is promising; below that (or failed) is dropped. - let floor = target / n as f64; - let mut candidates: Vec<(IpAddr, Duration, f64)> = Vec::new(); - { - let mut b = shared.lock().unwrap(); - for (ip, lat, r) in screened { - match r { - Ok(s) if s >= floor => candidates.push((ip, lat, s)), - Ok(s) => b.recent.push_back(Finished { ip, mbs: Some(s), pass: false }), - Err(_) => b.recent.push_back(Finished { ip, mbs: None, pass: false }), - } - } - while b.recent.len() > RECENT { - b.recent.pop_front(); - } - b.testing = candidates.iter().map(|(ip, _, _)| *ip).collect(); - } - - // Confirm promising IPs solo, fastest screen result first. - candidates.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal)); - for (ip, lat, _) in candidates { - if shared.lock().unwrap().found { - break 'consume; - } - let r = speed::measure_download(node.as_ref(), ip, secs(SPEED_TIMEOUT), SPEED_BYTES).await; - let (mbs, pass) = match &r { - Ok(s) => (Some(*s), *s >= target), - Err(_) => (None, false), - }; - let mut b = shared.lock().unwrap(); - b.testing.retain(|x| *x != ip); - b.recent.push_back(Finished { ip, mbs, pass }); - while b.recent.len() > RECENT { - b.recent.pop_front(); - } - if pass { - b.found = true; - hit = Some((ip, lat, mbs.unwrap())); - break 'consume; - } - } - } - - stop.store(true, Ordering::Relaxed); - let _ = ui.await; // render loop clears the view before returning - producer.abort(); - - match hit { - Some((ip, latency, mbs)) => { - let alias = format!("{mbs:.2}MB-{:.0}ms-{}", latency.as_secs_f64() * 1000.0, ip); - let url = node.to_url(ip, &alias); - print_easy_found(ip, mbs, latency, &url); - write_file(&args.output.join("result.txt"), &format!("{url}\n"))?; - } - None => eprintln!( - "{} no IP reached {target:.2} MB/s after testing {tested} valid IP(s)", - style("✗").red().bold() - ), - } - Ok(()) -} - -/// Background producer: ping → latency to mint validated IPs into the bucket, -/// blocking while the bucket is full, until `max_valid` are minted, the -/// consumer signals `found`, or the source dries up. -async fn produce( - ranges: Arc, - node: Arc, - max_valid: usize, - ipv6: bool, - shared: Shared, -) { - let mut barren = 0usize; - 'outer: loop { - { - let b = shared.lock().unwrap(); - if b.found || b.valid >= max_valid { - break; - } - } - let cfg = ping::PingConfig { - count: POOL, - timeout: secs(PING_TIMEOUT), - concurrency: PING_CONCURRENCY, - ipv6, - port: 443, - max_probe: POOL.saturating_mul(200), - times: 1, - min_latency: millis(PING_MIN_MS), - max_latency: millis(PING_MAX_MS), - max_loss: 0.0, - }; - let reachable = match ping::run(ranges.as_ref(), &cfg, |_| {}).await { - Ok(r) => r, - Err(_) => break, - }; - if reachable.is_empty() { - barren += 1; - if barren >= MAX_BARREN_ROUNDS { - break; - } - continue; - } - let ips: Vec = reachable.iter().map(|r| r.ip).collect(); - let passed = latency::run( - node.as_ref(), - &ips, - LAT_CONCURRENCY, - secs(LAT_TIMEOUT), - millis(LAT_MAX_MS), - 0, - |_| {}, - ) - .await; - - let mut minted = 0usize; - for r in passed { - if !enqueue(&shared, (r.ip, r.latency), max_valid).await { - break 'outer; // found, or hit the valid cap - } - minted += 1; - } - if minted == 0 { - barren += 1; - if barren >= MAX_BARREN_ROUNDS { - break; - } - } else { - barren = 0; - } - } - shared.lock().unwrap().producer_done = true; -} - -/// Wait for a free bucket slot and enqueue `item`; return false if we should -/// stop minting (consumer found a hit, or the valid cap is reached). -async fn enqueue(shared: &Shared, item: (IpAddr, Duration), max_valid: usize) -> bool { - loop { - { - let mut b = shared.lock().unwrap(); - if b.found || b.valid >= max_valid { - return false; - } - if b.waiting.len() + b.testing.len() < POOL { - b.waiting.push_back(item); - b.valid += 1; - return true; - } - } - tokio::time::sleep(POLL).await; - } -} - -/// Periodically redraw the bucket view until `stop`, then clear it. -async fn render_loop(shared: Shared, view: BucketView, target: f64, max: usize, stop: Arc) { - let mut tick = 0usize; - loop { - view.render(&shared, target, max, tick); - if stop.load(Ordering::Relaxed) { - break; - } - tick += 1; - tokio::time::sleep(FRAME).await; - } - view.clear(); -} - -/// The fixed, in-place "bucket / recent" panel: a header line, the bucket rows, -/// and the recent-results rows, each an indicatif line whose message we set. -struct BucketView { - _mp: MultiProgress, - header: ProgressBar, - bucket_label: ProgressBar, - bucket: Vec, - recent_label: ProgressBar, - recent: Vec, -} - -impl BucketView { - fn new() -> Self { - let mp = MultiProgress::new(); - let line = |mp: &MultiProgress| { - let pb = mp.add(ProgressBar::new(1)); - pb.set_style(ProgressStyle::with_template("{msg}").unwrap()); - pb - }; - let header = line(&mp); - let bucket_label = line(&mp); - let bucket = (0..POOL).map(|_| line(&mp)).collect(); - let recent_label = line(&mp); - let recent = (0..RECENT).map(|_| line(&mp)).collect(); - bucket_label.set_message(format!(" {}", style("bucket").dim())); - recent_label.set_message(format!(" {}", style("recent").dim())); - Self { _mp: mp, header, bucket_label, bucket, recent_label, recent } - } - - fn render(&self, shared: &Shared, target: f64, max: usize, tick: usize) { - let frame = FRAMES[tick % FRAMES.len()]; - let (testing, waiting, recent, valid) = { - let b = shared.lock().unwrap(); - ( - b.testing.clone(), - b.waiting.iter().map(|(ip, _)| *ip).collect::>(), - b.recent - .iter() - .rev() // newest first - .map(|f| (f.ip, f.mbs, f.pass)) - .collect::>(), - b.valid, - ) - }; - - self.header.set_message(format!( - "{} {}/{} valid target ≥ {:.2} MB/s", - style(frame).cyan(), - valid, - max, - target - )); - - // Bucket rows: the batch under test (spinner), then the ones waiting. - let mut rows: Vec = Vec::new(); - for ip in &testing { - rows.push(format!( - " {} {:<15} {}", - style(frame).cyan(), - ip.to_string(), - style("testing…").cyan() - )); - } - for ip in &waiting { - rows.push(format!(" {:<15} {}", ip.to_string(), style("waiting").dim())); - } - for (pb, row) in self.bucket.iter().zip(rows.iter().chain(std::iter::repeat(&String::new()))) { - pb.set_message(row.clone()); - } - - // Recent results, newest first. - for (i, pb) in self.recent.iter().enumerate() { - match recent.get(i) { - Some((ip, mbs, pass)) => { - let mark = if *pass { style("✓").green() } else { style("✗").red() }; - let value = match mbs { - Some(s) => format!("{s:.2} MB/s"), - None => "failed".to_string(), - }; - let value = if *pass { style(value).green() } else { style(value).dim() }; - pb.set_message(format!(" {mark} {:<15} {value}", ip.to_string())); - } - None => pb.set_message(String::new()), - } - } - } - - fn clear(&self) { - for pb in std::iter::once(&self.header) - .chain(std::iter::once(&self.bucket_label)) - .chain(self.bucket.iter()) - .chain(std::iter::once(&self.recent_label)) - .chain(self.recent.iter()) - { - pb.finish_and_clear(); - } - } -} - -fn spin(msg: &str) -> ProgressBar { - let pb = ProgressBar::new_spinner(); - pb.enable_steady_tick(Duration::from_millis(90)); - pb.set_message(msg.to_string()); - pb -} - -fn secs(s: f64) -> Duration { - Duration::from_secs_f64(s) -} - -fn millis(ms: f64) -> Duration { - Duration::from_secs_f64(ms / 1000.0) -} diff --git a/src/easy/consumer.rs b/src/easy/consumer.rs new file mode 100644 index 0000000..4590766 --- /dev/null +++ b/src/easy/consumer.rs @@ -0,0 +1,139 @@ +//! The consumer side of `easy`: run `CONCURRENCY` independent speed screens off +//! the validated queue, promote anything ≥ target/n to a contention-free solo +//! confirm, and stop on the first IP that clears `target`. Failures periodically +//! recalibrate the direct link in case it drifted. + +use std::net::IpAddr; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use futures::stream::{FuturesUnordered, StreamExt}; + +use crate::speed; +use crate::vless::VlessNode; + +use super::state::{Found, State}; +use super::{ + CONCURRENCY, DEFAULT_SPEED_FRACTION, DIRECT_TIMEOUT, FAIL_RECALIBRATE, POLL, SPEED_BYTES, + SPEED_TIMEOUT, secs, +}; + +/// Run the screen/confirm loop until an IP clears `target`, returning the winner +/// (or `None` if the source dries up). +pub(super) async fn consume(node: Arc, state: State, explicit_target: bool) -> Option { + let mut inflight = FuturesUnordered::new(); + let mut active: Vec = Vec::new(); // IPs in `inflight`, mirrored for the UI + + loop { + // --- Confirm phase: promising IPs are waiting. Let the in-flight + // screens wind down (we can't interrupt a download), then re-test + // each candidate solo for a contention-free number. --- + if state.confirm_pending() { + while let Some((ip, lat, r)) = inflight.next().await { + active.retain(|x| *x != ip); + state.set_testing(&active); + if let Some(found) = classify(&state, ip, lat, r, explicit_target).await { + return Some(found); + } + } + state.begin_confirm(); + while let Some((ip, lat)) = state.pop_confirm() { + state.set_testing(&[ip]); + let r = + speed::measure_download(node.as_ref(), ip, secs(SPEED_TIMEOUT), SPEED_BYTES).await; + match r { + Ok(s) if s >= state.target() => { + state.mark_found(ip, s); + return Some(Found { ip, latency: lat, mbs: s }); + } + Ok(s) => state.record_fail(ip, Some(s)), + Err(_) => state.record_fail(ip, None), + } + maybe_recalibrate(&state, explicit_target).await; + } + state.end_confirm(); + active.clear(); + continue; + } + + // --- Screen phase: top the in-flight set up to CONCURRENCY. --- + while inflight.len() < CONCURRENCY { + match state.pop_queue() { + Some((ip, lat)) => { + active.push(ip); + inflight.push(screen(node.clone(), ip, lat)); + } + None => break, + } + } + state.set_testing(&active); + + if inflight.is_empty() { + if state.is_drained() { + return None; + } + tokio::time::sleep(POLL).await; + continue; + } + + if let Some((ip, lat, r)) = inflight.next().await { + active.retain(|x| *x != ip); + state.set_testing(&active); + if let Some(found) = classify(&state, ip, lat, r, explicit_target).await { + return Some(found); + } + } + } +} + +/// One concurrent screen: download through the node and carry the IP + latency +/// along with the result. +async fn screen(node: Arc, ip: IpAddr, lat: Duration) -> (IpAddr, Duration, Result) { + let r = speed::measure_download(node.as_ref(), ip, secs(SPEED_TIMEOUT), SPEED_BYTES).await; + (ip, lat, r) +} + +/// Sort a finished screen into one of three outcomes: a direct hit (≥ target, +/// done), a promising candidate (≥ target/n, queued for solo confirm), or a +/// failure (recorded; may trigger a direct-link recalibration). +async fn classify( + state: &State, + ip: IpAddr, + lat: Duration, + r: Result, + explicit_target: bool, +) -> Option { + let target = state.target(); + let floor = target / CONCURRENCY as f64; + match r { + Ok(s) if s >= target => { + state.mark_found(ip, s); + return Some(Found { ip, latency: lat, mbs: s }); + } + Ok(s) if s >= floor => state.push_confirm(ip, lat), + Ok(s) => { + state.record_fail(ip, Some(s)); + maybe_recalibrate(state, explicit_target).await; + } + Err(_) => { + state.record_fail(ip, None); + maybe_recalibrate(state, explicit_target).await; + } + } + None +} + +/// Every `FAIL_RECALIBRATE` failures, re-measure the direct link. If the local +/// link itself slowed down, a default (direct-derived) target would otherwise be +/// unreachable forever; an explicit --speed target is left untouched. +async fn maybe_recalibrate(state: &State, explicit_target: bool) { + if !state.recalibration_due(FAIL_RECALIBRATE) { + return; + } + state.begin_recalibrate(); + let base = speed::measure_direct(secs(DIRECT_TIMEOUT), SPEED_BYTES).await; + // Recompute only a default (direct-derived) target; leave explicit ones be. + let new_target = base.ok().filter(|_| !explicit_target).map(|b| b * DEFAULT_SPEED_FRACTION); + state.end_recalibrate(new_target); +} diff --git a/src/easy/mod.rs b/src/easy/mod.rs new file mode 100644 index 0000000..7503b0b --- /dev/null +++ b/src/easy/mod.rs @@ -0,0 +1,172 @@ +//! `easy`: find one IP fast enough to hit a target speed, then stop. +//! +//! A [`producer`] keeps a validated *queue* of IPs (those that pass ping + +//! latency) topped up between a low and high watermark. The [`consumer`] runs +//! `CONCURRENCY` independent screens off that queue: under n-way contention each +//! stream gets roughly link/n, so anything reaching `target/n` is worth a solo +//! confirm and anything already at `target` is a guaranteed pass. Promising IPs +//! go onto a separate confirm queue; once the in-flight screens wind down, +//! they're re-tested single-threaded (accurate, no contention) until one clears +//! `target`. Every so many failures we re-measure the direct link in case it +//! drifted. A fixed [`ui`] panel shows what's under test and the last few +//! results, sharing one mutex-guarded [`state`] bucket between the three tasks. + +mod consumer; +mod producer; +mod state; +mod ui; + +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; + +use anyhow::{Result, anyhow}; +use console::style; +use indicatif::ProgressBar; + +use crate::cli::EasyArgs; +use crate::cloudflare::{self, Family}; +use crate::report::{print_easy_found, resolve_node, write_file}; +use crate::speed; + +use consumer::consume; +use producer::produce; +use state::{Found, State}; +use ui::{Panel, render_loop}; + +// Fixed knobs — the point of `easy` is to not expose these. +const CONCURRENCY: usize = 5; // independent screens in flight, also the screen floor divisor +const QUEUE_HIGH: usize = 50; // validated-queue refill target +const QUEUE_LOW: usize = 30; // refill kicks in once the queue drops below this +const PRODUCE_BATCH: usize = 50; // IPs per discovery round — one full latency wave (matches LAT_CONCURRENCY) +const RECENT: usize = 3; // finished results kept on screen for review +const FEED: usize = 5; // live discovery lines (ping/latency) shown while finding IPs + +const PING_CONCURRENCY: usize = 200; +const PING_TIMEOUT: f64 = 3.0; +const PING_MIN_MS: f64 = 10.0; +const PING_MAX_MS: f64 = 200.0; + +const LAT_CONCURRENCY: usize = 50; +const LAT_TIMEOUT: f64 = 5.0; +const LAT_MAX_MS: f64 = 300.0; + +const SPEED_TIMEOUT: f64 = 5.0; +const SPEED_BYTES: u64 = 50_000_000; // file is large; time/steady-state caps first + +const MAX_BARREN_ROUNDS: usize = 8; // give up after this many empty discovery rounds +const DEFAULT_SPEED_FRACTION: f64 = 0.80; // default target = this × direct speed + +const DIRECT_TIMEOUT: f64 = 8.0; // one longer direct measurement (the mirror is too bimodal to sample) +const FAIL_RECALIBRATE: usize = 100; // re-measure the direct link every this many failures + +const POLL: Duration = Duration::from_millis(50); // producer/consumer idle poll +const FRAME: Duration = Duration::from_millis(120); // render tick +const FRAMES: [&str; 10] = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; + +pub(crate) async fn run(args: EasyArgs) -> Result<()> { + let node = Arc::new(resolve_node(&args.node_file, &args.node)?); + if matches!(args.speed, Some(s) if s <= 0.0) { + return Err(anyhow!("--speed must be greater than 0")); + } + + // 0. Direct (no-proxy) speed via a domestic mirror — the link ceiling, and + // the source of the default target when --speed is omitted. One longer + // (8 s) measurement: the mirror's throughput is too bimodal for repeated + // short samples to agree on. + let spinner = spin("Measuring direct (no-proxy) download speed…"); + let baseline = speed::measure_direct(secs(DIRECT_TIMEOUT), SPEED_BYTES).await; + match &baseline { + Ok(mbs) => spinner.finish_with_message(format!( + "{} direct download speed: {} MB/s", + style("✓").green().bold(), + style(format!("{mbs:.2}")).cyan() + )), + Err(_) => spinner.finish_with_message(format!( + "{} direct speed unavailable (direct access blocked?)", + style("!").yellow().bold() + )), + } + + // Resolve the target: explicit --speed, else a fraction of the measured + // direct speed. Reject a target the local link can't reach; with neither a + // target nor a measured link there's nothing to aim for. + let explicit_target = args.speed.is_some(); + let (target, note) = match (args.speed, baseline) { + (Some(s), Ok(base)) if s > base => { + return Err(anyhow!( + "direct speed is only {base:.2} MB/s — can't find a node ≥ {s:.2} MB/s" + )); + } + (Some(s), _) => (s, String::new()), + (None, Ok(base)) => ( + base * DEFAULT_SPEED_FRACTION, + format!(" ({:.0}% of direct {base:.2})", DEFAULT_SPEED_FRACTION * 100.0), + ), + (None, Err(_)) => { + return Err(anyhow!( + "couldn't measure direct speed to pick a default target — pass --speed " + )); + } + }; + + let family = if args.ipv6 { Family::Both } else { Family::V4 }; + let spinner = spin("Fetching Cloudflare ranges…"); + let ranges = Arc::new(cloudflare::fetch_ranges(family).await?); + spinner.finish_with_message(format!( + "{} Cloudflare ranges: {} v4, {} v6", + style("✓").green().bold(), + ranges.v4.len(), + ranges.v6.len() + )); + eprintln!( + "{} target ≥ {} MB/s{}, up to {} valid IPs", + style("easy").bold().cyan(), + style(format!("{target:.2}")).cyan(), + style(note).dim(), + args.max + ); + + let state = State::new(target); + let producer = tokio::spawn(produce(ranges.clone(), node.clone(), args.max, args.ipv6, state.clone())); + let stop = Arc::new(AtomicBool::new(false)); + let ui = tokio::spawn(render_loop(state.clone(), Panel::new(), args.max, stop.clone())); + + let hit = consume(node.clone(), state.clone(), explicit_target).await; + + stop.store(true, Ordering::Relaxed); + let _ = ui.await; // render loop clears the view before returning + producer.abort(); + + match hit { + Some(Found { ip, latency, mbs }) => { + let alias = format!("{mbs:.2}MB-{:.0}ms-{}", latency.as_secs_f64() * 1000.0, ip); + let url = node.to_url(ip, &alias); + print_easy_found(ip, mbs, latency, &url); + write_file(&args.output.join("result.txt"), &format!("{url}\n"))?; + } + None => { + let (final_target, valid) = (state.target(), state.valid()); + eprintln!( + "{} no IP reached {final_target:.2} MB/s after testing {valid} valid IP(s)", + style("✗").red().bold() + ); + } + } + Ok(()) +} + +fn spin(msg: &str) -> ProgressBar { + let pb = ProgressBar::new_spinner(); + pb.enable_steady_tick(Duration::from_millis(90)); + pb.set_message(msg.to_string()); + pb +} + +fn secs(s: f64) -> Duration { + Duration::from_secs_f64(s) +} + +fn millis(ms: f64) -> Duration { + Duration::from_secs_f64(ms / 1000.0) +} diff --git a/src/easy/producer.rs b/src/easy/producer.rs new file mode 100644 index 0000000..0f44f43 --- /dev/null +++ b/src/easy/producer.rs @@ -0,0 +1,131 @@ +//! The discovery producer: refill the validated queue between watermarks by +//! pipelining ping → latency, streaming each reachable IP straight into the +//! latency stage so both run at once instead of latency waiting for the whole +//! ping batch. Surplus from a round is kept, not discarded. + +use std::net::IpAddr; +use std::sync::Arc; + +use futures::channel::mpsc; +use futures::stream::StreamExt; + +use crate::cloudflare::CfRanges; +use crate::latency::{self, LatStatus}; +use crate::ping; +use crate::vless::VlessNode; + +use super::state::State; +use super::ui::{lat_feed_line, ping_feed_line}; +use super::{ + LAT_CONCURRENCY, LAT_MAX_MS, LAT_TIMEOUT, MAX_BARREN_ROUNDS, PING_CONCURRENCY, PING_MAX_MS, + PING_MIN_MS, PING_TIMEOUT, POLL, PRODUCE_BATCH, QUEUE_HIGH, QUEUE_LOW, millis, secs, +}; + +/// Keep the validated queue topped up to `QUEUE_HIGH` once it drops below +/// `QUEUE_LOW`, idling otherwise, until `max_valid` are minted, the consumer +/// signals `found`, or the source dries up. +pub(super) async fn produce( + ranges: Arc, + node: Arc, + max_valid: usize, + ipv6: bool, + state: State, +) { + let mut barren = 0usize; + let mut refilling = true; // start by filling the empty queue + while let Some((q, valid)) = state.refill_check(max_valid) { + if refilling && q >= QUEUE_HIGH { + refilling = false; + } else if !refilling && q < QUEUE_LOW { + refilling = true; + barren = 0; + } + if !refilling { + tokio::time::sleep(POLL).await; + continue; + } + + let batch = QUEUE_HIGH.saturating_sub(q).min(max_valid - valid).clamp(1, PRODUCE_BATCH); + let cfg = ping::PingConfig { + count: batch, + timeout: secs(PING_TIMEOUT), + concurrency: PING_CONCURRENCY, + ipv6, + port: 443, + max_probe: batch.saturating_mul(200), + times: 1, + min_latency: millis(PING_MIN_MS), + max_latency: millis(PING_MAX_MS), + max_loss: 0.0, + }; + let minted = match discover_round(&ranges, &node, &cfg, max_valid, &state).await { + Ok(minted) => minted, + Err(()) => break, + }; + if minted == 0 { + barren += 1; + if barren >= MAX_BARREN_ROUNDS { + break; + } + } else { + barren = 0; + } + } + state.set_producer_done(); +} + +/// One pipelined discovery round: ping streams each reachable IP into the +/// latency stage; passing IPs are enqueued. Returns how many were minted, or +/// `Err(())` if the ping stage failed outright. +async fn discover_round( + ranges: &CfRanges, + node: &Arc, + cfg: &ping::PingConfig, + max_valid: usize, + state: &State, +) -> Result { + let (tx, rx) = mpsc::unbounded::(); + + let state_ping = state.clone(); + let ping_side = async { + ping::run(ranges, cfg, move |p| match p { + ping::Progress::Probing { ip, status, latency, .. } => { + if let Some(line) = ping_feed_line(ip, &status, latency) { + state_ping.push_feed(line); + let _ = tx.unbounded_send(ip); + } + } + }) + .await + }; + + let state_lat = state.clone(); + let node_lat = node.clone(); + let lat_side = async move { + let max_lat = millis(LAT_MAX_MS); + let mut minted = 0usize; + let mut results = rx + .map(|ip| { + let node = node_lat.clone(); + async move { (ip, latency::measure(node.as_ref(), ip, secs(LAT_TIMEOUT)).await) } + }) + .buffer_unordered(LAT_CONCURRENCY); + while let Some((ip, r)) = results.next().await { + let status = match r { + Ok(d) if max_lat.is_zero() || d <= max_lat => LatStatus::Ok(d), + Ok(d) => LatStatus::TooSlow(d), + Err(e) => LatStatus::Failed(e.to_string()), + }; + state_lat.push_feed(lat_feed_line(ip, &status)); + if let LatStatus::Ok(d) = status { + if state_lat.try_enqueue(ip, d, max_valid) { + minted += 1; + } + } + } + minted + }; + + let (ping_res, minted) = futures::join!(ping_side, lat_side); + ping_res.map(|_| minted).map_err(|_| ()) +} diff --git a/src/easy/state.rs b/src/easy/state.rs new file mode 100644 index 0000000..d376bfe --- /dev/null +++ b/src/easy/state.rs @@ -0,0 +1,223 @@ +//! Shared, observable state for an `easy` run: the validated queue, the confirm +//! queue, and the live testing/feed/recent views, all behind one [`State`] +//! handle whose methods are the *only* way to touch the bucket. A plain mutex +//! (never held across `.await`) is enough: producer and consumer drive it +//! through these methods, the renderer reads a [`Snapshot`]. + +use std::collections::VecDeque; +use std::net::IpAddr; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::time::Duration; + +use super::{FEED, RECENT}; + +/// The winning IP: address, carried latency, measured speed (MB/s). +pub(super) struct Found { + pub(super) ip: IpAddr, + pub(super) latency: Duration, + pub(super) mbs: f64, +} + +/// A point-in-time copy of everything the renderer draws. +pub(super) struct Snapshot { + pub(super) testing: Vec, + pub(super) feed: Vec, + pub(super) recent: Vec<(IpAddr, Option, bool)>, // newest first + pub(super) valid: usize, + pub(super) queued: usize, + pub(super) confirming: bool, + pub(super) recalibrating: bool, +} + +/// One finished speed test, kept for the "recent" panel. +struct Finished { + ip: IpAddr, + mbs: Option, + pass: bool, +} + +struct Bucket { + queue: VecDeque<(IpAddr, Duration)>, // validated, awaiting a concurrent screen + confirm: VecDeque<(IpAddr, Duration)>, // promising (≥ target/n), awaiting a solo confirm + testing: Vec, // IPs under test right now (screen or confirm) + feed: VecDeque, // last FEED discovery lines (producer's ping/latency) + recent: VecDeque, // last RECENT finished, oldest first + valid: usize, // total minted (gates --max + header) + fails: usize, // tested-and-rejected count (drives recalibration) + target: f64, // current target MB/s (recalibrated mid-run) + confirming: bool, // a solo confirm is running (pauses screens, tints UI) + recalibrating: bool, // a direct re-measure is running + producer_done: bool, + found: bool, +} + +/// Cloneable handle to the shared bucket. All locking lives here. +#[derive(Clone)] +pub(super) struct State(Arc>); + +impl State { + /// A fresh state aimed at `target` MB/s. + pub(super) fn new(target: f64) -> Self { + State(Arc::new(Mutex::new(Bucket { + queue: VecDeque::new(), + confirm: VecDeque::new(), + testing: Vec::new(), + feed: VecDeque::new(), + recent: VecDeque::new(), + valid: 0, + fails: 0, + target, + confirming: false, + recalibrating: false, + producer_done: false, + found: false, + }))) + } + + fn lock(&self) -> MutexGuard<'_, Bucket> { + self.0.lock().unwrap() + } + + // --- reads --- + + pub(super) fn target(&self) -> f64 { + self.lock().target + } + + pub(super) fn valid(&self) -> usize { + self.lock().valid + } + + pub(super) fn confirm_pending(&self) -> bool { + !self.lock().confirm.is_empty() + } + + /// True once the producer is finished and nothing remains to test. + pub(super) fn is_drained(&self) -> bool { + let b = self.lock(); + b.producer_done && b.queue.is_empty() && b.confirm.is_empty() + } + + /// Whether a direct-link recalibration is due (every `every` failures). + pub(super) fn recalibration_due(&self, every: usize) -> bool { + let b = self.lock(); + b.fails > 0 && b.fails % every == 0 && !b.found + } + + // --- consumer --- + + pub(super) fn pop_queue(&self) -> Option<(IpAddr, Duration)> { + self.lock().queue.pop_front() + } + + pub(super) fn pop_confirm(&self) -> Option<(IpAddr, Duration)> { + self.lock().confirm.pop_front() + } + + pub(super) fn push_confirm(&self, ip: IpAddr, lat: Duration) { + self.lock().confirm.push_back((ip, lat)); + } + + pub(super) fn set_testing(&self, ips: &[IpAddr]) { + self.lock().testing = ips.to_vec(); + } + + pub(super) fn begin_confirm(&self) { + self.lock().confirming = true; + } + + pub(super) fn end_confirm(&self) { + let mut b = self.lock(); + b.confirming = false; + b.testing.clear(); + } + + /// Mark the search won and record the winner in the recent panel. + pub(super) fn mark_found(&self, ip: IpAddr, mbs: f64) { + let mut b = self.lock(); + b.found = true; + push_recent(&mut b, ip, Some(mbs), true); + b.testing.clear(); + } + + /// Record a rejected IP and bump the failure counter. + pub(super) fn record_fail(&self, ip: IpAddr, mbs: Option) { + let mut b = self.lock(); + b.fails += 1; + push_recent(&mut b, ip, mbs, false); + } + + pub(super) fn begin_recalibrate(&self) { + let mut b = self.lock(); + b.recalibrating = true; + b.testing.clear(); + } + + pub(super) fn end_recalibrate(&self, new_target: Option) { + let mut b = self.lock(); + b.recalibrating = false; + if let Some(t) = new_target { + b.target = t; + } + } + + // --- producer --- + + /// Stop signal plus a one-lock read of `(queue_len, valid)`; `None` means the + /// producer should stop (found, or the valid cap is reached). + pub(super) fn refill_check(&self, max_valid: usize) -> Option<(usize, usize)> { + let b = self.lock(); + if b.found || b.valid >= max_valid { + None + } else { + Some((b.queue.len(), b.valid)) + } + } + + /// Enqueue a validated IP unless we've already stopped; returns whether it + /// was minted. + pub(super) fn try_enqueue(&self, ip: IpAddr, lat: Duration, max_valid: usize) -> bool { + let mut b = self.lock(); + if b.found || b.valid >= max_valid { + return false; + } + b.queue.push_back((ip, lat)); + b.valid += 1; + true + } + + /// Push one line onto the bounded discovery feed (newest last). + pub(super) fn push_feed(&self, line: String) { + let mut b = self.lock(); + b.feed.push_back(line); + while b.feed.len() > FEED { + b.feed.pop_front(); + } + } + + pub(super) fn set_producer_done(&self) { + self.lock().producer_done = true; + } + + // --- renderer --- + + pub(super) fn snapshot(&self) -> Snapshot { + let b = self.lock(); + Snapshot { + testing: b.testing.clone(), + feed: b.feed.iter().cloned().collect(), + recent: b.recent.iter().rev().map(|f| (f.ip, f.mbs, f.pass)).collect(), + valid: b.valid, + queued: b.queue.len(), + confirming: b.confirming, + recalibrating: b.recalibrating, + } + } +} + +fn push_recent(b: &mut Bucket, ip: IpAddr, mbs: Option, pass: bool) { + b.recent.push_back(Finished { ip, mbs, pass }); + while b.recent.len() > RECENT { + b.recent.pop_front(); + } +} diff --git a/src/easy/ui.rs b/src/easy/ui.rs new file mode 100644 index 0000000..3a639f9 --- /dev/null +++ b/src/easy/ui.rs @@ -0,0 +1,189 @@ +//! The fixed in-place panel — header, discovery feed, testing rows, recent rows +//! — and the line formatters that feed it. + +use std::net::IpAddr; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; + +use console::{Style, style}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; + +use crate::latency::LatStatus; +use crate::ping; + +use super::state::{Snapshot, State}; +use super::{CONCURRENCY, FEED, FRAME, FRAMES, QUEUE_LOW, RECENT}; + +/// Periodically redraw the panel until `stop`, then clear it. +pub(super) async fn render_loop(state: State, panel: Panel, max: usize, stop: Arc) { + let mut tick = 0usize; + loop { + panel.render(&state, max, tick); + if stop.load(Ordering::Relaxed) { + break; + } + tick += 1; + tokio::time::sleep(FRAME).await; + } + panel.clear(); +} + +/// The fixed in-place panel: a header, `CONCURRENCY` "testing" rows, and +/// `RECENT` "recent result" rows — each an indicatif line whose message we set. +pub(super) struct Panel { + _mp: MultiProgress, + header: ProgressBar, + discover_label: ProgressBar, + discover: Vec, + testing_label: ProgressBar, + testing: Vec, + recent_label: ProgressBar, + recent: Vec, +} + +impl Panel { + pub(super) fn new() -> Self { + let mp = MultiProgress::new(); + let line = |mp: &MultiProgress| { + let pb = mp.add(ProgressBar::new(1)); + pb.set_style(ProgressStyle::with_template("{msg}").unwrap()); + pb + }; + let header = line(&mp); + let discover_label = line(&mp); + let discover = (0..FEED).map(|_| line(&mp)).collect(); + let testing_label = line(&mp); + let testing = (0..CONCURRENCY).map(|_| line(&mp)).collect(); + let recent_label = line(&mp); + let recent = (0..RECENT).map(|_| line(&mp)).collect(); + testing_label.set_message(format!(" {}", style("testing").dim())); + recent_label.set_message(format!(" {}", style("recent").dim())); + Self { _mp: mp, header, discover_label, discover, testing_label, testing, recent_label, recent } + } + + fn render(&self, state: &State, max: usize, tick: usize) { + let frame = FRAMES[tick % FRAMES.len()]; + let Snapshot { testing, feed, recent, valid, queued, confirming, recalibrating } = + state.snapshot(); + + // Header: the search budget remaining (max valid IPs minus those minted). + // It counts down to 0 — reaching 0 means the search gave up. + if recalibrating { + self.header + .set_message(format!("{} re-measuring direct speed…", style(frame).yellow())); + } else { + // Budget remaining: green when plenty, fading to red as it nears 0. + let remaining = max.saturating_sub(valid); + let ratio = if max == 0 { 0.0 } else { remaining as f64 / max as f64 }; + let remaining = Style::new().color256(health_color(ratio)).bold().apply_to(remaining); + self.header + .set_message(format!("{} finding fast IPs… {}", style(frame).cyan(), remaining)); + } + + // Discovery label carries the live queue depth: red when near-empty, + // greening as it climbs to the refill watermark (QUEUE_LOW). Dimmed so it + // reads as a soft secondary indicator. + let q_ratio = queued as f64 / QUEUE_LOW as f64; + let queued = Style::new().color256(health_color(q_ratio)).dim().apply_to(queued); + self.discover_label + .set_message(format!(" {} {}", style("discovering").dim(), queued)); + + // Discovery feed: the ping + latency live log, scrolling (newest last). + for (i, pb) in self.discover.iter().enumerate() { + match feed.get(i) { + Some(line) => pb.set_message(line.clone()), + None => pb.set_message(String::new()), + } + } + + // Testing rows: each IP currently under test (screen, or solo confirm). + let verb = if confirming { "confirming…" } else { "testing…" }; + for (i, pb) in self.testing.iter().enumerate() { + match testing.get(i) { + Some(ip) => { + let label = if confirming { + style(verb).magenta() + } else { + style(verb).cyan() + }; + pb.set_message(format!(" {} {:<15} {}", style(frame).cyan(), ip.to_string(), label)); + } + None => pb.set_message(String::new()), + } + } + + // Recent results, newest first. + for (i, pb) in self.recent.iter().enumerate() { + match recent.get(i) { + Some((ip, mbs, pass)) => { + let mark = if *pass { style("✓").green() } else { style("✗").red() }; + let value = match mbs { + Some(s) => format!("{s:.2} MB/s"), + None => "failed".to_string(), + }; + let value = if *pass { style(value).green() } else { style(value).dim() }; + pb.set_message(format!(" {mark} {:<15} {value}", ip.to_string())); + } + None => pb.set_message(String::new()), + } + } + } + + fn clear(&self) { + for pb in std::iter::once(&self.header) + .chain(std::iter::once(&self.discover_label)) + .chain(self.discover.iter()) + .chain(std::iter::once(&self.testing_label)) + .chain(self.testing.iter()) + .chain(std::iter::once(&self.recent_label)) + .chain(self.recent.iter()) + { + pb.finish_and_clear(); + } + } +} + +/// Map a 0.0 (worst) .. 1.0 (best) ratio to a green→red ANSI-256 colour, with +/// enough stops to read as a gradient rather than a few coarse buckets. +fn health_color(ratio: f64) -> u8 { + // A clean green→yellow→red path through the 6×6×6 colour cube (red first). + const RAMP: [u8; 11] = [196, 202, 208, 214, 220, 226, 190, 154, 118, 82, 46]; + let i = (ratio.clamp(0.0, 1.0) * (RAMP.len() - 1) as f64).round() as usize; + RAMP[i] +} + +/// A ping-stage feed line for an IP that passed the filters +/// (`[ping] `); `None` for the unreachable/filtered firehose. +pub(super) fn ping_feed_line(ip: IpAddr, status: &ping::ProbeStatus, latency: Option) -> Option { + if !matches!(status, ping::ProbeStatus::Ok) { + return None; + } + let ms = format!("{:.0}ms", latency?.as_secs_f64() * 1000.0); + Some(format!( + " {} {} {}", + style(format!("{:<9}", "[ping]")).blue().dim(), + style(format!("{:<15}", ip.to_string())).dim(), + style(ms).green().dim(), + )) +} + +/// A latency-stage feed line (`[latency] `), dimmed like the +/// `latency` command's live log. +pub(super) fn lat_feed_line(ip: IpAddr, status: &LatStatus) -> String { + let tag = style(format!("{:<9}", "[latency]")).cyan().dim(); + let addr = style(format!("{:<15}", ip.to_string())).dim(); + match status { + LatStatus::Ok(d) => { + let ms = format!("{:.0}ms", d.as_secs_f64() * 1000.0); + format!(" {tag} {addr} {}", style(ms).green().dim()) + } + LatStatus::TooSlow(d) => { + let ms = format!("{:.0}ms", d.as_secs_f64() * 1000.0); + format!(" {tag} {addr} {}", style(format!("{ms} too slow")).red().dim()) + } + LatStatus::Failed(why) => { + format!(" {tag} {addr} {}", style(why.clone()).red().dim()) + } + } +}