feat: rework easy with pipelined discovery, solo-confirm, and a live panel

This commit is contained in:
chuan
2026-06-26 01:51:23 +08:00
Unverified
parent a0425324c0
commit 9de35210b7
7 changed files with 857 additions and 470 deletions
+3 -3
View File
@@ -112,9 +112,9 @@ auto 使用各阶段默认的超时、并发、丢包等参数;需精调时单
### easy — `fast-xray easy [NODE] --speed <MB/s> [OPTIONS]`
只要一个够快的 IP 就停。后台持续用 `ping + latency` 往池子(桶)里补充有效 IP;前台每轮并发筛一批(5 个),并发下就达标的直接命中,有潜力的(≥ `目标/5`)再单独确认,第一个达到 `--speed` 的即为结果,直接输出可复制的节点。
只要一个够快的 IP 就停。后台持续用 `ping + latency` 补充有效 IP 队列(低于 30 时补到 50);前台用 5 个独立 worker 并发筛,谁测完谁立刻取下一个,互不等待。并发下直接达到 `--speed` 的直接命中;有潜力的(≥ `目标/5`)记入确认队列,等手头这批并发跑完后再**逐个单独(无并发)**测速确认真实速度,第一个达标的即为结果,直接输出可复制的节点。
先直连国内镜像(清华 TUNA)一次本机不走代理下载速,作为带宽上限:`--speed` 高于它直接判定不可能;镜像偶发不可用则跳过、照常搜索。
先直连国内镜像(清华 TUNA)一次 8 秒的不走代理下载速,作为带宽上限:`--speed` 高于它直接判定不可能;镜像偶发不可用则跳过、照常搜索。搜索中每累计 100 次失败会重测一次直连速度,本机链路变慢时按新值下调默认目标(显式 `--speed` 则不动)。
| 参数 | 默认 | 说明 |
| --- | --- | --- |
@@ -124,7 +124,7 @@ auto 使用各阶段默认的超时、并发、丢包等参数;需精调时单
| `-6, --ipv6` | off | 同时搜索 IPv6 |
| `-o, --output` | result | 输出目录(写 result.txt) |
固定参数:ping 并发 100、≤200ms;latency 并发 10、≤200ms;测速并发筛 5 个再单独确认、每次上限 5s
固定参数:ping 并发 100、≤200ms;latency 并发 10、≤200ms;测速 5 路并发筛、有潜力的单独确认、每次上限 5s;有效队列水位 30/50
### ping — `fast-xray ping [OPTIONS]`
-467
View File
@@ -1,467 +0,0 @@
//! `easy`: find one IP fast enough to hit a target speed, then stop.
//!
//! A producer keeps a bounded *bucket* of validated IPs (those that pass ping +
//! latency) topped up. A consumer pulls a batch and screens it concurrently:
//! under n-way contention each stream gets roughly link/n, so anything reaching
//! `target/n` is worth a solo confirm and anything already at `target` is a
//! guaranteed pass. The promising ones are then re-tested single-threaded (for
//! an accurate number) until one clears `target`. A live "bucket / recent" view
//! shows what's queued, what's being tested, and the last few finished results.
use std::collections::VecDeque;
use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::{Result, anyhow};
use console::style;
use futures::future::join_all;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use crate::cli::EasyArgs;
use crate::cloudflare::{self, CfRanges, Family};
use crate::latency;
use crate::ping;
use crate::report::{print_easy_found, resolve_node, write_file};
use crate::speed;
use crate::vless::VlessNode;
// Fixed knobs — the point of `easy` is to not expose these.
const POOL: usize = 10; // bucket capacity (testing + waiting), also the live row count
const BATCH: usize = 5; // IPs screened concurrently per round
const RECENT: usize = 5; // finished results kept on screen for review
const PING_CONCURRENCY: usize = 100;
const PING_TIMEOUT: f64 = 3.0;
const PING_MIN_MS: f64 = 10.0;
const PING_MAX_MS: f64 = 200.0;
const LAT_CONCURRENCY: usize = 10;
const LAT_TIMEOUT: f64 = 5.0;
const LAT_MAX_MS: f64 = 200.0;
const SPEED_TIMEOUT: f64 = 5.0;
const SPEED_BYTES: u64 = 50_000_000; // file is large; time/steady-state caps first
const MAX_BARREN_ROUNDS: usize = 8; // give up after this many empty discovery rounds
const DEFAULT_SPEED_FRACTION: f64 = 0.80; // default target = this × direct speed
const POLL: Duration = Duration::from_millis(50); // producer/consumer idle poll
const FRAME: Duration = Duration::from_millis(120); // render tick
const FRAMES: [&str; 10] = ["", "", "", "", "", "", "", "", "", ""];
/// One finished speed test, kept for the "recent" panel.
struct Finished {
ip: IpAddr,
mbs: Option<f64>,
pass: bool,
}
/// Shared, observable bucket. A plain mutex (never held across `.await`) is
/// enough: producer and consumer poll it, the renderer reads snapshots.
struct Bucket {
waiting: VecDeque<(IpAddr, Duration)>, // validated, not yet speed-tested
testing: Vec<IpAddr>, // the batch being screened / confirmed
recent: VecDeque<Finished>, // last RECENT finished, oldest first
valid: usize, // total minted (gates --max + header)
producer_done: bool,
found: bool,
}
type Shared = Arc<Mutex<Bucket>>;
pub(crate) async fn run(args: EasyArgs) -> Result<()> {
let node = Arc::new(resolve_node(&args.node_file, &args.node)?);
if matches!(args.speed, Some(s) if s <= 0.0) {
return Err(anyhow!("--speed must be greater than 0"));
}
// 0. Direct (no-proxy) speed via a domestic mirror — the link ceiling, and
// the source of the default target when --speed is omitted.
let spinner = spin("Measuring direct (no-proxy) download speed…");
let baseline = speed::measure_direct(secs(SPEED_TIMEOUT), SPEED_BYTES).await;
match &baseline {
Ok(mbs) => spinner.finish_with_message(format!(
"{} direct download speed: {} MB/s",
style("").green().bold(),
style(format!("{mbs:.2}")).cyan()
)),
Err(_) => spinner.finish_with_message(format!(
"{} direct speed unavailable (direct access blocked?)",
style("!").yellow().bold()
)),
}
// Resolve the target: explicit --speed, else a fraction of the measured
// direct speed. Reject a target the local link can't reach; with neither a
// target nor a measured link there's nothing to aim for.
let (target, note) = match (args.speed, baseline) {
(Some(s), Ok(base)) if s > base => {
return Err(anyhow!(
"direct speed is only {base:.2} MB/s — can't find a node ≥ {s:.2} MB/s"
));
}
(Some(s), _) => (s, String::new()),
(None, Ok(base)) => (
base * DEFAULT_SPEED_FRACTION,
format!(" ({:.0}% of direct {base:.2})", DEFAULT_SPEED_FRACTION * 100.0),
),
(None, Err(_)) => {
return Err(anyhow!(
"couldn't measure direct speed to pick a default target — pass --speed <MB/s>"
));
}
};
let family = if args.ipv6 { Family::Both } else { Family::V4 };
let spinner = spin("Fetching Cloudflare ranges…");
let ranges = Arc::new(cloudflare::fetch_ranges(family).await?);
spinner.finish_with_message(format!(
"{} Cloudflare ranges: {} v4, {} v6",
style("").green().bold(),
ranges.v4.len(),
ranges.v6.len()
));
eprintln!(
"{} target ≥ {} MB/s{}, up to {} valid IPs",
style("easy").bold().cyan(),
style(format!("{target:.2}")).cyan(),
style(note).dim(),
args.max
);
let shared: Shared = Arc::new(Mutex::new(Bucket {
waiting: VecDeque::new(),
testing: Vec::new(),
recent: VecDeque::new(),
valid: 0,
producer_done: false,
found: false,
}));
let producer = tokio::spawn(produce(ranges.clone(), node.clone(), args.max, args.ipv6, shared.clone()));
let stop = Arc::new(AtomicBool::new(false));
let ui = tokio::spawn(render_loop(shared.clone(), BucketView::new(), target, args.max, stop.clone()));
// Consumer: pull a batch, screen it concurrently, then confirm the
// promising ones single-threaded (for an accurate number).
let mut hit: Option<(IpAddr, Duration, f64)> = None;
let mut tested = 0usize;
// What the locked acquire step decided — kept tiny so the guard is released
// before any await.
enum Step {
Stop,
Wait,
Go(Vec<(IpAddr, Duration)>),
}
'consume: loop {
let step = {
let mut b = shared.lock().unwrap();
let n = b.waiting.len().min(BATCH);
if b.found || (n == 0 && b.producer_done) {
Step::Stop
} else if n == 0 {
Step::Wait
} else {
let batch: Vec<_> = b.waiting.drain(..n).collect();
b.testing = batch.iter().map(|(ip, _)| *ip).collect();
Step::Go(batch)
}
};
let batch = match step {
Step::Stop => break,
Step::Wait => {
tokio::time::sleep(POLL).await;
continue;
}
Step::Go(batch) => batch,
};
let n = batch.len();
tested += n;
// Screen the whole batch at once.
let screened = join_all(batch.into_iter().map(|(ip, lat)| {
let node = node.clone();
async move {
let r =
speed::measure_download(node.as_ref(), ip, secs(SPEED_TIMEOUT), SPEED_BYTES).await;
(ip, lat, r)
}
}))
.await;
// Anything at target/n is promising; below that (or failed) is dropped.
let floor = target / n as f64;
let mut candidates: Vec<(IpAddr, Duration, f64)> = Vec::new();
{
let mut b = shared.lock().unwrap();
for (ip, lat, r) in screened {
match r {
Ok(s) if s >= floor => candidates.push((ip, lat, s)),
Ok(s) => b.recent.push_back(Finished { ip, mbs: Some(s), pass: false }),
Err(_) => b.recent.push_back(Finished { ip, mbs: None, pass: false }),
}
}
while b.recent.len() > RECENT {
b.recent.pop_front();
}
b.testing = candidates.iter().map(|(ip, _, _)| *ip).collect();
}
// Confirm promising IPs solo, fastest screen result first.
candidates.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
for (ip, lat, _) in candidates {
if shared.lock().unwrap().found {
break 'consume;
}
let r = speed::measure_download(node.as_ref(), ip, secs(SPEED_TIMEOUT), SPEED_BYTES).await;
let (mbs, pass) = match &r {
Ok(s) => (Some(*s), *s >= target),
Err(_) => (None, false),
};
let mut b = shared.lock().unwrap();
b.testing.retain(|x| *x != ip);
b.recent.push_back(Finished { ip, mbs, pass });
while b.recent.len() > RECENT {
b.recent.pop_front();
}
if pass {
b.found = true;
hit = Some((ip, lat, mbs.unwrap()));
break 'consume;
}
}
}
stop.store(true, Ordering::Relaxed);
let _ = ui.await; // render loop clears the view before returning
producer.abort();
match hit {
Some((ip, latency, mbs)) => {
let alias = format!("{mbs:.2}MB-{:.0}ms-{}", latency.as_secs_f64() * 1000.0, ip);
let url = node.to_url(ip, &alias);
print_easy_found(ip, mbs, latency, &url);
write_file(&args.output.join("result.txt"), &format!("{url}\n"))?;
}
None => eprintln!(
"{} no IP reached {target:.2} MB/s after testing {tested} valid IP(s)",
style("").red().bold()
),
}
Ok(())
}
/// Background producer: ping → latency to mint validated IPs into the bucket,
/// blocking while the bucket is full, until `max_valid` are minted, the
/// consumer signals `found`, or the source dries up.
async fn produce(
ranges: Arc<CfRanges>,
node: Arc<VlessNode>,
max_valid: usize,
ipv6: bool,
shared: Shared,
) {
let mut barren = 0usize;
'outer: loop {
{
let b = shared.lock().unwrap();
if b.found || b.valid >= max_valid {
break;
}
}
let cfg = ping::PingConfig {
count: POOL,
timeout: secs(PING_TIMEOUT),
concurrency: PING_CONCURRENCY,
ipv6,
port: 443,
max_probe: POOL.saturating_mul(200),
times: 1,
min_latency: millis(PING_MIN_MS),
max_latency: millis(PING_MAX_MS),
max_loss: 0.0,
};
let reachable = match ping::run(ranges.as_ref(), &cfg, |_| {}).await {
Ok(r) => r,
Err(_) => break,
};
if reachable.is_empty() {
barren += 1;
if barren >= MAX_BARREN_ROUNDS {
break;
}
continue;
}
let ips: Vec<IpAddr> = reachable.iter().map(|r| r.ip).collect();
let passed = latency::run(
node.as_ref(),
&ips,
LAT_CONCURRENCY,
secs(LAT_TIMEOUT),
millis(LAT_MAX_MS),
0,
|_| {},
)
.await;
let mut minted = 0usize;
for r in passed {
if !enqueue(&shared, (r.ip, r.latency), max_valid).await {
break 'outer; // found, or hit the valid cap
}
minted += 1;
}
if minted == 0 {
barren += 1;
if barren >= MAX_BARREN_ROUNDS {
break;
}
} else {
barren = 0;
}
}
shared.lock().unwrap().producer_done = true;
}
/// Wait for a free bucket slot and enqueue `item`; return false if we should
/// stop minting (consumer found a hit, or the valid cap is reached).
async fn enqueue(shared: &Shared, item: (IpAddr, Duration), max_valid: usize) -> bool {
loop {
{
let mut b = shared.lock().unwrap();
if b.found || b.valid >= max_valid {
return false;
}
if b.waiting.len() + b.testing.len() < POOL {
b.waiting.push_back(item);
b.valid += 1;
return true;
}
}
tokio::time::sleep(POLL).await;
}
}
/// Periodically redraw the bucket view until `stop`, then clear it.
async fn render_loop(shared: Shared, view: BucketView, target: f64, max: usize, stop: Arc<AtomicBool>) {
let mut tick = 0usize;
loop {
view.render(&shared, target, max, tick);
if stop.load(Ordering::Relaxed) {
break;
}
tick += 1;
tokio::time::sleep(FRAME).await;
}
view.clear();
}
/// The fixed, in-place "bucket / recent" panel: a header line, the bucket rows,
/// and the recent-results rows, each an indicatif line whose message we set.
struct BucketView {
_mp: MultiProgress,
header: ProgressBar,
bucket_label: ProgressBar,
bucket: Vec<ProgressBar>,
recent_label: ProgressBar,
recent: Vec<ProgressBar>,
}
impl BucketView {
fn new() -> Self {
let mp = MultiProgress::new();
let line = |mp: &MultiProgress| {
let pb = mp.add(ProgressBar::new(1));
pb.set_style(ProgressStyle::with_template("{msg}").unwrap());
pb
};
let header = line(&mp);
let bucket_label = line(&mp);
let bucket = (0..POOL).map(|_| line(&mp)).collect();
let recent_label = line(&mp);
let recent = (0..RECENT).map(|_| line(&mp)).collect();
bucket_label.set_message(format!(" {}", style("bucket").dim()));
recent_label.set_message(format!(" {}", style("recent").dim()));
Self { _mp: mp, header, bucket_label, bucket, recent_label, recent }
}
fn render(&self, shared: &Shared, target: f64, max: usize, tick: usize) {
let frame = FRAMES[tick % FRAMES.len()];
let (testing, waiting, recent, valid) = {
let b = shared.lock().unwrap();
(
b.testing.clone(),
b.waiting.iter().map(|(ip, _)| *ip).collect::<Vec<_>>(),
b.recent
.iter()
.rev() // newest first
.map(|f| (f.ip, f.mbs, f.pass))
.collect::<Vec<_>>(),
b.valid,
)
};
self.header.set_message(format!(
"{} {}/{} valid target ≥ {:.2} MB/s",
style(frame).cyan(),
valid,
max,
target
));
// Bucket rows: the batch under test (spinner), then the ones waiting.
let mut rows: Vec<String> = Vec::new();
for ip in &testing {
rows.push(format!(
" {} {:<15} {}",
style(frame).cyan(),
ip.to_string(),
style("testing…").cyan()
));
}
for ip in &waiting {
rows.push(format!(" {:<15} {}", ip.to_string(), style("waiting").dim()));
}
for (pb, row) in self.bucket.iter().zip(rows.iter().chain(std::iter::repeat(&String::new()))) {
pb.set_message(row.clone());
}
// Recent results, newest first.
for (i, pb) in self.recent.iter().enumerate() {
match recent.get(i) {
Some((ip, mbs, pass)) => {
let mark = if *pass { style("").green() } else { style("").red() };
let value = match mbs {
Some(s) => format!("{s:.2} MB/s"),
None => "failed".to_string(),
};
let value = if *pass { style(value).green() } else { style(value).dim() };
pb.set_message(format!(" {mark} {:<15} {value}", ip.to_string()));
}
None => pb.set_message(String::new()),
}
}
}
fn clear(&self) {
for pb in std::iter::once(&self.header)
.chain(std::iter::once(&self.bucket_label))
.chain(self.bucket.iter())
.chain(std::iter::once(&self.recent_label))
.chain(self.recent.iter())
{
pb.finish_and_clear();
}
}
}
fn spin(msg: &str) -> ProgressBar {
let pb = ProgressBar::new_spinner();
pb.enable_steady_tick(Duration::from_millis(90));
pb.set_message(msg.to_string());
pb
}
fn secs(s: f64) -> Duration {
Duration::from_secs_f64(s)
}
fn millis(ms: f64) -> Duration {
Duration::from_secs_f64(ms / 1000.0)
}
+139
View File
@@ -0,0 +1,139 @@
//! The consumer side of `easy`: run `CONCURRENCY` independent speed screens off
//! the validated queue, promote anything ≥ target/n to a contention-free solo
//! confirm, and stop on the first IP that clears `target`. Failures periodically
//! recalibrate the direct link in case it drifted.
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use futures::stream::{FuturesUnordered, StreamExt};
use crate::speed;
use crate::vless::VlessNode;
use super::state::{Found, State};
use super::{
CONCURRENCY, DEFAULT_SPEED_FRACTION, DIRECT_TIMEOUT, FAIL_RECALIBRATE, POLL, SPEED_BYTES,
SPEED_TIMEOUT, secs,
};
/// Run the screen/confirm loop until an IP clears `target`, returning the winner
/// (or `None` if the source dries up).
pub(super) async fn consume(node: Arc<VlessNode>, state: State, explicit_target: bool) -> Option<Found> {
let mut inflight = FuturesUnordered::new();
let mut active: Vec<IpAddr> = Vec::new(); // IPs in `inflight`, mirrored for the UI
loop {
// --- Confirm phase: promising IPs are waiting. Let the in-flight
// screens wind down (we can't interrupt a download), then re-test
// each candidate solo for a contention-free number. ---
if state.confirm_pending() {
while let Some((ip, lat, r)) = inflight.next().await {
active.retain(|x| *x != ip);
state.set_testing(&active);
if let Some(found) = classify(&state, ip, lat, r, explicit_target).await {
return Some(found);
}
}
state.begin_confirm();
while let Some((ip, lat)) = state.pop_confirm() {
state.set_testing(&[ip]);
let r =
speed::measure_download(node.as_ref(), ip, secs(SPEED_TIMEOUT), SPEED_BYTES).await;
match r {
Ok(s) if s >= state.target() => {
state.mark_found(ip, s);
return Some(Found { ip, latency: lat, mbs: s });
}
Ok(s) => state.record_fail(ip, Some(s)),
Err(_) => state.record_fail(ip, None),
}
maybe_recalibrate(&state, explicit_target).await;
}
state.end_confirm();
active.clear();
continue;
}
// --- Screen phase: top the in-flight set up to CONCURRENCY. ---
while inflight.len() < CONCURRENCY {
match state.pop_queue() {
Some((ip, lat)) => {
active.push(ip);
inflight.push(screen(node.clone(), ip, lat));
}
None => break,
}
}
state.set_testing(&active);
if inflight.is_empty() {
if state.is_drained() {
return None;
}
tokio::time::sleep(POLL).await;
continue;
}
if let Some((ip, lat, r)) = inflight.next().await {
active.retain(|x| *x != ip);
state.set_testing(&active);
if let Some(found) = classify(&state, ip, lat, r, explicit_target).await {
return Some(found);
}
}
}
}
/// One concurrent screen: download through the node and carry the IP + latency
/// along with the result.
async fn screen(node: Arc<VlessNode>, ip: IpAddr, lat: Duration) -> (IpAddr, Duration, Result<f64>) {
let r = speed::measure_download(node.as_ref(), ip, secs(SPEED_TIMEOUT), SPEED_BYTES).await;
(ip, lat, r)
}
/// Sort a finished screen into one of three outcomes: a direct hit (≥ target,
/// done), a promising candidate (≥ target/n, queued for solo confirm), or a
/// failure (recorded; may trigger a direct-link recalibration).
async fn classify(
state: &State,
ip: IpAddr,
lat: Duration,
r: Result<f64>,
explicit_target: bool,
) -> Option<Found> {
let target = state.target();
let floor = target / CONCURRENCY as f64;
match r {
Ok(s) if s >= target => {
state.mark_found(ip, s);
return Some(Found { ip, latency: lat, mbs: s });
}
Ok(s) if s >= floor => state.push_confirm(ip, lat),
Ok(s) => {
state.record_fail(ip, Some(s));
maybe_recalibrate(state, explicit_target).await;
}
Err(_) => {
state.record_fail(ip, None);
maybe_recalibrate(state, explicit_target).await;
}
}
None
}
/// Every `FAIL_RECALIBRATE` failures, re-measure the direct link. If the local
/// link itself slowed down, a default (direct-derived) target would otherwise be
/// unreachable forever; an explicit --speed target is left untouched.
async fn maybe_recalibrate(state: &State, explicit_target: bool) {
if !state.recalibration_due(FAIL_RECALIBRATE) {
return;
}
state.begin_recalibrate();
let base = speed::measure_direct(secs(DIRECT_TIMEOUT), SPEED_BYTES).await;
// Recompute only a default (direct-derived) target; leave explicit ones be.
let new_target = base.ok().filter(|_| !explicit_target).map(|b| b * DEFAULT_SPEED_FRACTION);
state.end_recalibrate(new_target);
}
+172
View File
@@ -0,0 +1,172 @@
//! `easy`: find one IP fast enough to hit a target speed, then stop.
//!
//! A [`producer`] keeps a validated *queue* of IPs (those that pass ping +
//! latency) topped up between a low and high watermark. The [`consumer`] runs
//! `CONCURRENCY` independent screens off that queue: under n-way contention each
//! stream gets roughly link/n, so anything reaching `target/n` is worth a solo
//! confirm and anything already at `target` is a guaranteed pass. Promising IPs
//! go onto a separate confirm queue; once the in-flight screens wind down,
//! they're re-tested single-threaded (accurate, no contention) until one clears
//! `target`. Every so many failures we re-measure the direct link in case it
//! drifted. A fixed [`ui`] panel shows what's under test and the last few
//! results, sharing one mutex-guarded [`state`] bucket between the three tasks.
mod consumer;
mod producer;
mod state;
mod ui;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use anyhow::{Result, anyhow};
use console::style;
use indicatif::ProgressBar;
use crate::cli::EasyArgs;
use crate::cloudflare::{self, Family};
use crate::report::{print_easy_found, resolve_node, write_file};
use crate::speed;
use consumer::consume;
use producer::produce;
use state::{Found, State};
use ui::{Panel, render_loop};
// Fixed knobs — the point of `easy` is to not expose these.
const CONCURRENCY: usize = 5; // independent screens in flight, also the screen floor divisor
const QUEUE_HIGH: usize = 50; // validated-queue refill target
const QUEUE_LOW: usize = 30; // refill kicks in once the queue drops below this
const PRODUCE_BATCH: usize = 50; // IPs per discovery round — one full latency wave (matches LAT_CONCURRENCY)
const RECENT: usize = 3; // finished results kept on screen for review
const FEED: usize = 5; // live discovery lines (ping/latency) shown while finding IPs
const PING_CONCURRENCY: usize = 200;
const PING_TIMEOUT: f64 = 3.0;
const PING_MIN_MS: f64 = 10.0;
const PING_MAX_MS: f64 = 200.0;
const LAT_CONCURRENCY: usize = 50;
const LAT_TIMEOUT: f64 = 5.0;
const LAT_MAX_MS: f64 = 300.0;
const SPEED_TIMEOUT: f64 = 5.0;
const SPEED_BYTES: u64 = 50_000_000; // file is large; time/steady-state caps first
const MAX_BARREN_ROUNDS: usize = 8; // give up after this many empty discovery rounds
const DEFAULT_SPEED_FRACTION: f64 = 0.80; // default target = this × direct speed
const DIRECT_TIMEOUT: f64 = 8.0; // one longer direct measurement (the mirror is too bimodal to sample)
const FAIL_RECALIBRATE: usize = 100; // re-measure the direct link every this many failures
const POLL: Duration = Duration::from_millis(50); // producer/consumer idle poll
const FRAME: Duration = Duration::from_millis(120); // render tick
const FRAMES: [&str; 10] = ["", "", "", "", "", "", "", "", "", ""];
pub(crate) async fn run(args: EasyArgs) -> Result<()> {
let node = Arc::new(resolve_node(&args.node_file, &args.node)?);
if matches!(args.speed, Some(s) if s <= 0.0) {
return Err(anyhow!("--speed must be greater than 0"));
}
// 0. Direct (no-proxy) speed via a domestic mirror — the link ceiling, and
// the source of the default target when --speed is omitted. One longer
// (8 s) measurement: the mirror's throughput is too bimodal for repeated
// short samples to agree on.
let spinner = spin("Measuring direct (no-proxy) download speed…");
let baseline = speed::measure_direct(secs(DIRECT_TIMEOUT), SPEED_BYTES).await;
match &baseline {
Ok(mbs) => spinner.finish_with_message(format!(
"{} direct download speed: {} MB/s",
style("").green().bold(),
style(format!("{mbs:.2}")).cyan()
)),
Err(_) => spinner.finish_with_message(format!(
"{} direct speed unavailable (direct access blocked?)",
style("!").yellow().bold()
)),
}
// Resolve the target: explicit --speed, else a fraction of the measured
// direct speed. Reject a target the local link can't reach; with neither a
// target nor a measured link there's nothing to aim for.
let explicit_target = args.speed.is_some();
let (target, note) = match (args.speed, baseline) {
(Some(s), Ok(base)) if s > base => {
return Err(anyhow!(
"direct speed is only {base:.2} MB/s — can't find a node ≥ {s:.2} MB/s"
));
}
(Some(s), _) => (s, String::new()),
(None, Ok(base)) => (
base * DEFAULT_SPEED_FRACTION,
format!(" ({:.0}% of direct {base:.2})", DEFAULT_SPEED_FRACTION * 100.0),
),
(None, Err(_)) => {
return Err(anyhow!(
"couldn't measure direct speed to pick a default target — pass --speed <MB/s>"
));
}
};
let family = if args.ipv6 { Family::Both } else { Family::V4 };
let spinner = spin("Fetching Cloudflare ranges…");
let ranges = Arc::new(cloudflare::fetch_ranges(family).await?);
spinner.finish_with_message(format!(
"{} Cloudflare ranges: {} v4, {} v6",
style("").green().bold(),
ranges.v4.len(),
ranges.v6.len()
));
eprintln!(
"{} target ≥ {} MB/s{}, up to {} valid IPs",
style("easy").bold().cyan(),
style(format!("{target:.2}")).cyan(),
style(note).dim(),
args.max
);
let state = State::new(target);
let producer = tokio::spawn(produce(ranges.clone(), node.clone(), args.max, args.ipv6, state.clone()));
let stop = Arc::new(AtomicBool::new(false));
let ui = tokio::spawn(render_loop(state.clone(), Panel::new(), args.max, stop.clone()));
let hit = consume(node.clone(), state.clone(), explicit_target).await;
stop.store(true, Ordering::Relaxed);
let _ = ui.await; // render loop clears the view before returning
producer.abort();
match hit {
Some(Found { ip, latency, mbs }) => {
let alias = format!("{mbs:.2}MB-{:.0}ms-{}", latency.as_secs_f64() * 1000.0, ip);
let url = node.to_url(ip, &alias);
print_easy_found(ip, mbs, latency, &url);
write_file(&args.output.join("result.txt"), &format!("{url}\n"))?;
}
None => {
let (final_target, valid) = (state.target(), state.valid());
eprintln!(
"{} no IP reached {final_target:.2} MB/s after testing {valid} valid IP(s)",
style("").red().bold()
);
}
}
Ok(())
}
fn spin(msg: &str) -> ProgressBar {
let pb = ProgressBar::new_spinner();
pb.enable_steady_tick(Duration::from_millis(90));
pb.set_message(msg.to_string());
pb
}
fn secs(s: f64) -> Duration {
Duration::from_secs_f64(s)
}
fn millis(ms: f64) -> Duration {
Duration::from_secs_f64(ms / 1000.0)
}
+131
View File
@@ -0,0 +1,131 @@
//! The discovery producer: refill the validated queue between watermarks by
//! pipelining ping → latency, streaming each reachable IP straight into the
//! latency stage so both run at once instead of latency waiting for the whole
//! ping batch. Surplus from a round is kept, not discarded.
use std::net::IpAddr;
use std::sync::Arc;
use futures::channel::mpsc;
use futures::stream::StreamExt;
use crate::cloudflare::CfRanges;
use crate::latency::{self, LatStatus};
use crate::ping;
use crate::vless::VlessNode;
use super::state::State;
use super::ui::{lat_feed_line, ping_feed_line};
use super::{
LAT_CONCURRENCY, LAT_MAX_MS, LAT_TIMEOUT, MAX_BARREN_ROUNDS, PING_CONCURRENCY, PING_MAX_MS,
PING_MIN_MS, PING_TIMEOUT, POLL, PRODUCE_BATCH, QUEUE_HIGH, QUEUE_LOW, millis, secs,
};
/// Keep the validated queue topped up to `QUEUE_HIGH` once it drops below
/// `QUEUE_LOW`, idling otherwise, until `max_valid` are minted, the consumer
/// signals `found`, or the source dries up.
pub(super) async fn produce(
ranges: Arc<CfRanges>,
node: Arc<VlessNode>,
max_valid: usize,
ipv6: bool,
state: State,
) {
let mut barren = 0usize;
let mut refilling = true; // start by filling the empty queue
while let Some((q, valid)) = state.refill_check(max_valid) {
if refilling && q >= QUEUE_HIGH {
refilling = false;
} else if !refilling && q < QUEUE_LOW {
refilling = true;
barren = 0;
}
if !refilling {
tokio::time::sleep(POLL).await;
continue;
}
let batch = QUEUE_HIGH.saturating_sub(q).min(max_valid - valid).clamp(1, PRODUCE_BATCH);
let cfg = ping::PingConfig {
count: batch,
timeout: secs(PING_TIMEOUT),
concurrency: PING_CONCURRENCY,
ipv6,
port: 443,
max_probe: batch.saturating_mul(200),
times: 1,
min_latency: millis(PING_MIN_MS),
max_latency: millis(PING_MAX_MS),
max_loss: 0.0,
};
let minted = match discover_round(&ranges, &node, &cfg, max_valid, &state).await {
Ok(minted) => minted,
Err(()) => break,
};
if minted == 0 {
barren += 1;
if barren >= MAX_BARREN_ROUNDS {
break;
}
} else {
barren = 0;
}
}
state.set_producer_done();
}
/// One pipelined discovery round: ping streams each reachable IP into the
/// latency stage; passing IPs are enqueued. Returns how many were minted, or
/// `Err(())` if the ping stage failed outright.
async fn discover_round(
ranges: &CfRanges,
node: &Arc<VlessNode>,
cfg: &ping::PingConfig,
max_valid: usize,
state: &State,
) -> Result<usize, ()> {
let (tx, rx) = mpsc::unbounded::<IpAddr>();
let state_ping = state.clone();
let ping_side = async {
ping::run(ranges, cfg, move |p| match p {
ping::Progress::Probing { ip, status, latency, .. } => {
if let Some(line) = ping_feed_line(ip, &status, latency) {
state_ping.push_feed(line);
let _ = tx.unbounded_send(ip);
}
}
})
.await
};
let state_lat = state.clone();
let node_lat = node.clone();
let lat_side = async move {
let max_lat = millis(LAT_MAX_MS);
let mut minted = 0usize;
let mut results = rx
.map(|ip| {
let node = node_lat.clone();
async move { (ip, latency::measure(node.as_ref(), ip, secs(LAT_TIMEOUT)).await) }
})
.buffer_unordered(LAT_CONCURRENCY);
while let Some((ip, r)) = results.next().await {
let status = match r {
Ok(d) if max_lat.is_zero() || d <= max_lat => LatStatus::Ok(d),
Ok(d) => LatStatus::TooSlow(d),
Err(e) => LatStatus::Failed(e.to_string()),
};
state_lat.push_feed(lat_feed_line(ip, &status));
if let LatStatus::Ok(d) = status {
if state_lat.try_enqueue(ip, d, max_valid) {
minted += 1;
}
}
}
minted
};
let (ping_res, minted) = futures::join!(ping_side, lat_side);
ping_res.map(|_| minted).map_err(|_| ())
}
+223
View File
@@ -0,0 +1,223 @@
//! Shared, observable state for an `easy` run: the validated queue, the confirm
//! queue, and the live testing/feed/recent views, all behind one [`State`]
//! handle whose methods are the *only* way to touch the bucket. A plain mutex
//! (never held across `.await`) is enough: producer and consumer drive it
//! through these methods, the renderer reads a [`Snapshot`].
use std::collections::VecDeque;
use std::net::IpAddr;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use super::{FEED, RECENT};
/// The winning IP: address, carried latency, measured speed (MB/s).
pub(super) struct Found {
pub(super) ip: IpAddr,
pub(super) latency: Duration,
pub(super) mbs: f64,
}
/// A point-in-time copy of everything the renderer draws.
pub(super) struct Snapshot {
pub(super) testing: Vec<IpAddr>,
pub(super) feed: Vec<String>,
pub(super) recent: Vec<(IpAddr, Option<f64>, bool)>, // newest first
pub(super) valid: usize,
pub(super) queued: usize,
pub(super) confirming: bool,
pub(super) recalibrating: bool,
}
/// One finished speed test, kept for the "recent" panel.
struct Finished {
ip: IpAddr,
mbs: Option<f64>,
pass: bool,
}
struct Bucket {
queue: VecDeque<(IpAddr, Duration)>, // validated, awaiting a concurrent screen
confirm: VecDeque<(IpAddr, Duration)>, // promising (≥ target/n), awaiting a solo confirm
testing: Vec<IpAddr>, // IPs under test right now (screen or confirm)
feed: VecDeque<String>, // last FEED discovery lines (producer's ping/latency)
recent: VecDeque<Finished>, // last RECENT finished, oldest first
valid: usize, // total minted (gates --max + header)
fails: usize, // tested-and-rejected count (drives recalibration)
target: f64, // current target MB/s (recalibrated mid-run)
confirming: bool, // a solo confirm is running (pauses screens, tints UI)
recalibrating: bool, // a direct re-measure is running
producer_done: bool,
found: bool,
}
/// Cloneable handle to the shared bucket. All locking lives here.
#[derive(Clone)]
pub(super) struct State(Arc<Mutex<Bucket>>);
impl State {
/// A fresh state aimed at `target` MB/s.
pub(super) fn new(target: f64) -> Self {
State(Arc::new(Mutex::new(Bucket {
queue: VecDeque::new(),
confirm: VecDeque::new(),
testing: Vec::new(),
feed: VecDeque::new(),
recent: VecDeque::new(),
valid: 0,
fails: 0,
target,
confirming: false,
recalibrating: false,
producer_done: false,
found: false,
})))
}
fn lock(&self) -> MutexGuard<'_, Bucket> {
self.0.lock().unwrap()
}
// --- reads ---
pub(super) fn target(&self) -> f64 {
self.lock().target
}
pub(super) fn valid(&self) -> usize {
self.lock().valid
}
pub(super) fn confirm_pending(&self) -> bool {
!self.lock().confirm.is_empty()
}
/// True once the producer is finished and nothing remains to test.
pub(super) fn is_drained(&self) -> bool {
let b = self.lock();
b.producer_done && b.queue.is_empty() && b.confirm.is_empty()
}
/// Whether a direct-link recalibration is due (every `every` failures).
pub(super) fn recalibration_due(&self, every: usize) -> bool {
let b = self.lock();
b.fails > 0 && b.fails % every == 0 && !b.found
}
// --- consumer ---
pub(super) fn pop_queue(&self) -> Option<(IpAddr, Duration)> {
self.lock().queue.pop_front()
}
pub(super) fn pop_confirm(&self) -> Option<(IpAddr, Duration)> {
self.lock().confirm.pop_front()
}
pub(super) fn push_confirm(&self, ip: IpAddr, lat: Duration) {
self.lock().confirm.push_back((ip, lat));
}
pub(super) fn set_testing(&self, ips: &[IpAddr]) {
self.lock().testing = ips.to_vec();
}
pub(super) fn begin_confirm(&self) {
self.lock().confirming = true;
}
pub(super) fn end_confirm(&self) {
let mut b = self.lock();
b.confirming = false;
b.testing.clear();
}
/// Mark the search won and record the winner in the recent panel.
pub(super) fn mark_found(&self, ip: IpAddr, mbs: f64) {
let mut b = self.lock();
b.found = true;
push_recent(&mut b, ip, Some(mbs), true);
b.testing.clear();
}
/// Record a rejected IP and bump the failure counter.
pub(super) fn record_fail(&self, ip: IpAddr, mbs: Option<f64>) {
let mut b = self.lock();
b.fails += 1;
push_recent(&mut b, ip, mbs, false);
}
pub(super) fn begin_recalibrate(&self) {
let mut b = self.lock();
b.recalibrating = true;
b.testing.clear();
}
pub(super) fn end_recalibrate(&self, new_target: Option<f64>) {
let mut b = self.lock();
b.recalibrating = false;
if let Some(t) = new_target {
b.target = t;
}
}
// --- producer ---
/// Stop signal plus a one-lock read of `(queue_len, valid)`; `None` means the
/// producer should stop (found, or the valid cap is reached).
pub(super) fn refill_check(&self, max_valid: usize) -> Option<(usize, usize)> {
let b = self.lock();
if b.found || b.valid >= max_valid {
None
} else {
Some((b.queue.len(), b.valid))
}
}
/// Enqueue a validated IP unless we've already stopped; returns whether it
/// was minted.
pub(super) fn try_enqueue(&self, ip: IpAddr, lat: Duration, max_valid: usize) -> bool {
let mut b = self.lock();
if b.found || b.valid >= max_valid {
return false;
}
b.queue.push_back((ip, lat));
b.valid += 1;
true
}
/// Push one line onto the bounded discovery feed (newest last).
pub(super) fn push_feed(&self, line: String) {
let mut b = self.lock();
b.feed.push_back(line);
while b.feed.len() > FEED {
b.feed.pop_front();
}
}
pub(super) fn set_producer_done(&self) {
self.lock().producer_done = true;
}
// --- renderer ---
pub(super) fn snapshot(&self) -> Snapshot {
let b = self.lock();
Snapshot {
testing: b.testing.clone(),
feed: b.feed.iter().cloned().collect(),
recent: b.recent.iter().rev().map(|f| (f.ip, f.mbs, f.pass)).collect(),
valid: b.valid,
queued: b.queue.len(),
confirming: b.confirming,
recalibrating: b.recalibrating,
}
}
}
fn push_recent(b: &mut Bucket, ip: IpAddr, mbs: Option<f64>, pass: bool) {
b.recent.push_back(Finished { ip, mbs, pass });
while b.recent.len() > RECENT {
b.recent.pop_front();
}
}
+189
View File
@@ -0,0 +1,189 @@
//! The fixed in-place panel — header, discovery feed, testing rows, recent rows
//! — and the line formatters that feed it.
use std::net::IpAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use console::{Style, style};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use crate::latency::LatStatus;
use crate::ping;
use super::state::{Snapshot, State};
use super::{CONCURRENCY, FEED, FRAME, FRAMES, QUEUE_LOW, RECENT};
/// Periodically redraw the panel until `stop`, then clear it.
pub(super) async fn render_loop(state: State, panel: Panel, max: usize, stop: Arc<AtomicBool>) {
let mut tick = 0usize;
loop {
panel.render(&state, max, tick);
if stop.load(Ordering::Relaxed) {
break;
}
tick += 1;
tokio::time::sleep(FRAME).await;
}
panel.clear();
}
/// The fixed in-place panel: a header, `CONCURRENCY` "testing" rows, and
/// `RECENT` "recent result" rows — each an indicatif line whose message we set.
pub(super) struct Panel {
_mp: MultiProgress,
header: ProgressBar,
discover_label: ProgressBar,
discover: Vec<ProgressBar>,
testing_label: ProgressBar,
testing: Vec<ProgressBar>,
recent_label: ProgressBar,
recent: Vec<ProgressBar>,
}
impl Panel {
pub(super) fn new() -> Self {
let mp = MultiProgress::new();
let line = |mp: &MultiProgress| {
let pb = mp.add(ProgressBar::new(1));
pb.set_style(ProgressStyle::with_template("{msg}").unwrap());
pb
};
let header = line(&mp);
let discover_label = line(&mp);
let discover = (0..FEED).map(|_| line(&mp)).collect();
let testing_label = line(&mp);
let testing = (0..CONCURRENCY).map(|_| line(&mp)).collect();
let recent_label = line(&mp);
let recent = (0..RECENT).map(|_| line(&mp)).collect();
testing_label.set_message(format!(" {}", style("testing").dim()));
recent_label.set_message(format!(" {}", style("recent").dim()));
Self { _mp: mp, header, discover_label, discover, testing_label, testing, recent_label, recent }
}
fn render(&self, state: &State, max: usize, tick: usize) {
let frame = FRAMES[tick % FRAMES.len()];
let Snapshot { testing, feed, recent, valid, queued, confirming, recalibrating } =
state.snapshot();
// Header: the search budget remaining (max valid IPs minus those minted).
// It counts down to 0 — reaching 0 means the search gave up.
if recalibrating {
self.header
.set_message(format!("{} re-measuring direct speed…", style(frame).yellow()));
} else {
// Budget remaining: green when plenty, fading to red as it nears 0.
let remaining = max.saturating_sub(valid);
let ratio = if max == 0 { 0.0 } else { remaining as f64 / max as f64 };
let remaining = Style::new().color256(health_color(ratio)).bold().apply_to(remaining);
self.header
.set_message(format!("{} finding fast IPs… {}", style(frame).cyan(), remaining));
}
// Discovery label carries the live queue depth: red when near-empty,
// greening as it climbs to the refill watermark (QUEUE_LOW). Dimmed so it
// reads as a soft secondary indicator.
let q_ratio = queued as f64 / QUEUE_LOW as f64;
let queued = Style::new().color256(health_color(q_ratio)).dim().apply_to(queued);
self.discover_label
.set_message(format!(" {} {}", style("discovering").dim(), queued));
// Discovery feed: the ping + latency live log, scrolling (newest last).
for (i, pb) in self.discover.iter().enumerate() {
match feed.get(i) {
Some(line) => pb.set_message(line.clone()),
None => pb.set_message(String::new()),
}
}
// Testing rows: each IP currently under test (screen, or solo confirm).
let verb = if confirming { "confirming…" } else { "testing…" };
for (i, pb) in self.testing.iter().enumerate() {
match testing.get(i) {
Some(ip) => {
let label = if confirming {
style(verb).magenta()
} else {
style(verb).cyan()
};
pb.set_message(format!(" {} {:<15} {}", style(frame).cyan(), ip.to_string(), label));
}
None => pb.set_message(String::new()),
}
}
// Recent results, newest first.
for (i, pb) in self.recent.iter().enumerate() {
match recent.get(i) {
Some((ip, mbs, pass)) => {
let mark = if *pass { style("").green() } else { style("").red() };
let value = match mbs {
Some(s) => format!("{s:.2} MB/s"),
None => "failed".to_string(),
};
let value = if *pass { style(value).green() } else { style(value).dim() };
pb.set_message(format!(" {mark} {:<15} {value}", ip.to_string()));
}
None => pb.set_message(String::new()),
}
}
}
fn clear(&self) {
for pb in std::iter::once(&self.header)
.chain(std::iter::once(&self.discover_label))
.chain(self.discover.iter())
.chain(std::iter::once(&self.testing_label))
.chain(self.testing.iter())
.chain(std::iter::once(&self.recent_label))
.chain(self.recent.iter())
{
pb.finish_and_clear();
}
}
}
/// Map a 0.0 (worst) .. 1.0 (best) ratio to a green→red ANSI-256 colour, with
/// enough stops to read as a gradient rather than a few coarse buckets.
fn health_color(ratio: f64) -> u8 {
// A clean green→yellow→red path through the 6×6×6 colour cube (red first).
const RAMP: [u8; 11] = [196, 202, 208, 214, 220, 226, 190, 154, 118, 82, 46];
let i = (ratio.clamp(0.0, 1.0) * (RAMP.len() - 1) as f64).round() as usize;
RAMP[i]
}
/// A ping-stage feed line for an IP that passed the filters
/// (`[ping] <ip> <rtt>`); `None` for the unreachable/filtered firehose.
pub(super) fn ping_feed_line(ip: IpAddr, status: &ping::ProbeStatus, latency: Option<Duration>) -> Option<String> {
if !matches!(status, ping::ProbeStatus::Ok) {
return None;
}
let ms = format!("{:.0}ms", latency?.as_secs_f64() * 1000.0);
Some(format!(
" {} {} {}",
style(format!("{:<9}", "[ping]")).blue().dim(),
style(format!("{:<15}", ip.to_string())).dim(),
style(ms).green().dim(),
))
}
/// A latency-stage feed line (`[latency] <ip> <rtt|reason>`), dimmed like the
/// `latency` command's live log.
pub(super) fn lat_feed_line(ip: IpAddr, status: &LatStatus) -> String {
let tag = style(format!("{:<9}", "[latency]")).cyan().dim();
let addr = style(format!("{:<15}", ip.to_string())).dim();
match status {
LatStatus::Ok(d) => {
let ms = format!("{:.0}ms", d.as_secs_f64() * 1000.0);
format!(" {tag} {addr} {}", style(ms).green().dim())
}
LatStatus::TooSlow(d) => {
let ms = format!("{:.0}ms", d.as_secs_f64() * 1000.0);
format!(" {tag} {addr} {}", style(format!("{ms} too slow")).red().dim())
}
LatStatus::Failed(why) => {
format!(" {tag} {addr} {}", style(why.clone()).red().dim())
}
}
}