feat: stream ping probes and enrich live progress

- replace the two-phase availability pre-scan with a single-stage
  continuous stream that round-robins segments and keeps concurrency
  probes in flight, so a slow unreachable IP only blocks its own slot
- drop the now-unused --probe-sample knob
- show per-probe reasons (unreachable / too fast / too slow / loss) in
  the live feed across all stages
- cut the auto speed timeout to 5s
This commit is contained in:
chuan
2026-06-23 23:12:09 +08:00
Unverified
parent 7dec1e0479
commit ffe32f4dac
5 changed files with 95 additions and 153 deletions
-1
View File
@@ -113,7 +113,6 @@ auto 使用各阶段默认的超时、并发、丢包等参数;需精调时单
| `-c, --concurrency` | 100 | 并发数 |
| `-6, --ipv6` | off | 同时测 IPv6 |
| `-p, --port` | 443 | 探测端口 |
| `--probe-sample` | 5 | 每段试探抽样数 |
| `--max-probe` | count×100 | 探测预算上限 |
| `--times` | 1 | 每 IP 探测次数(`>1` 启用丢包过滤) |
| `--min` / `--max` | 10 / 300 | 平均延迟区间 ms |
-4
View File
@@ -105,10 +105,6 @@ pub(crate) struct PingArgs {
#[arg(short = 'p', long, default_value_t = 443)]
pub(crate) port: u16,
/// Random IPs sampled per segment in the availability pre-scan.
#[arg(long, default_value_t = 5)]
pub(crate) probe_sample: usize,
/// Safety cap: stop after probing this many IPs. Defaults to count * 100.
#[arg(long)]
pub(crate) max_probe: Option<usize>,
+28 -21
View File
@@ -9,7 +9,7 @@ use indicatif::ProgressBar;
use crate::cli::{AutoArgs, ExportArgs, LatencyArgs, PingArgs, SpeedArgs};
use crate::cloudflare::{self, Family};
use crate::ping::{self, Progress};
use crate::ping::{self, Progress, ProbeStatus};
use crate::report::{
LiveProgress, print_speed_table, print_table, read_export_rows, read_ip_list, read_latency_csv,
resolve_node, write_csv, write_file, write_ips, write_speed_csv,
@@ -20,11 +20,10 @@ use crate::{latency, speed};
// Stage defaults the auto pipeline uses for the knobs it doesn't expose.
const PING_TIMEOUT: f64 = 3.0;
const PING_CONCURRENCY: usize = 100;
const PING_PROBE_SAMPLE: usize = 5;
const PING_MIN_LATENCY_MS: f64 = 10.0;
const LAT_CONCURRENCY: usize = 50;
const LAT_TIMEOUT: f64 = 5.0;
const SPEED_TIMEOUT: f64 = 10.0;
const SPEED_TIMEOUT: f64 = 5.0;
const SPEED_BYTES: u64 = 10_000_000;
// ---- individual subcommands ------------------------------------------------
@@ -36,7 +35,6 @@ pub(crate) async fn run_ping(args: PingArgs) -> Result<()> {
concurrency: args.concurrency.max(1),
ipv6: args.ipv6,
port: args.port,
probe_sample: args.probe_sample,
max_probe: args.max_probe.unwrap_or(args.count.saturating_mul(100)),
times: args.times.max(1),
min_latency: Duration::from_secs_f64(args.min_latency / 1000.0),
@@ -157,7 +155,6 @@ pub(crate) async fn run_auto(args: AutoArgs) -> Result<()> {
concurrency: PING_CONCURRENCY,
ipv6: args.ipv6,
port: 443,
probe_sample: PING_PROBE_SAMPLE,
max_probe: args.count.saturating_mul(100),
times: 1,
min_latency: Duration::from_secs_f64(PING_MIN_LATENCY_MS / 1000.0),
@@ -251,18 +248,25 @@ async fn stage_ping(cfg: &ping::PingConfig) -> Result<Vec<ping::PingResult>> {
let target = cfg.count;
let mut progress = LiveProgress::new(cfg.count as u64, "valid");
let results = ping::run(&ranges, cfg, |event| match event {
Progress::Probing { ip, ok, valid, probed } => {
let mark = if ok { style("").green() } else { style("x").red() };
progress.push(valid.min(target) as u64, probed, format!("{mark} {ip}"));
}
Progress::PreScan { live, segments, valid } => {
progress.println(format!(
" pre-scan: {} live / {} segments, {} already valid",
style(live).cyan(),
segments,
style(valid).green(),
));
progress.set_position(valid.min(target) as u64);
Progress::Probing { ip, status, latency, valid, probed } => {
let addr = style(format!("{:<15}", ip.to_string())).dim();
let ms = latency.map(|d| format!("{:.0}ms", d.as_secs_f64() * 1000.0)).unwrap_or_default();
let line = match status {
ProbeStatus::Ok => format!("{} {addr} {}", style("").green().dim(), style(ms).green().dim()),
ProbeStatus::Unreachable => {
format!("{} {addr} {}", style("x").red().dim(), style("unreachable").dim())
}
ProbeStatus::TooFast => {
format!("{} {addr} {}", style("x").red().dim(), style(format!("{ms} too fast")).dim())
}
ProbeStatus::TooSlow => {
format!("{} {addr} {}", style("x").red().dim(), style(format!("{ms} too slow")).dim())
}
ProbeStatus::Lossy => {
format!("{} {addr} {}", style("x").red().dim(), style("loss").dim())
}
};
progress.push(valid.min(target) as u64, probed, line);
}
})
.await?;
@@ -282,12 +286,14 @@ async fn stage_latency(
let mut progress = LiveProgress::new(ips.len() as u64, "tested");
let mut kept = 0usize;
let results = latency::run(node, ips, concurrency, timeout, max_latency, top, |p| {
let addr = style(format!("{:<15}", p.ip.to_string())).dim();
let line = match p.latency {
Some(d) => {
kept += 1;
format!("{} {} {:.0}ms", style("").green(), p.ip, d.as_secs_f64() * 1000.0)
let ms = format!("{:.0}ms", d.as_secs_f64() * 1000.0);
format!("{} {addr} {}", style("").green().dim(), style(ms).green().dim())
}
None => format!("{} {}", style("x").red(), p.ip),
None => format!("{} {addr} {}", style("x").red().dim(), style("failed").dim()),
};
progress.push(p.done as u64, kept, line);
})
@@ -310,12 +316,13 @@ async fn stage_speed(
let mut kept = 0usize;
let mut results =
speed::run(node, inputs, concurrency, timeout, bytes, min_speed, |p| {
let addr = style(format!("{:<15}", p.ip.to_string())).dim();
let line = match p.speed_mbps {
Some(mbps) => {
kept += 1;
format!("{} {} {:.2}Mbps", style("").green(), p.ip, mbps)
format!("{} {addr} {}", style("").green().dim(), style(format!("{mbps:.2}Mbps")).green().dim())
}
None => format!("{} {}", style("x").red(), p.ip),
None => format!("{} {addr} {}", style("x").red().dim(), style("failed").dim()),
};
progress.push(p.done as u64, kept, line);
})
+67 -119
View File
@@ -6,14 +6,12 @@
//! the host answers pings — a different thing, often throttled or blocked —
//! and would need raw sockets or an external `ping` binary.
//!
//! Flow:
//! Phase A — availability pre-scan: sample a few IPs per segment, keep only
//! segments that answer (CF segment usability is very uneven).
//! Phase B — collection: sample uniformly across live segments and probe in
//! concurrent batches until `count` valid IPs are gathered (or the
//! `max_probe` budget is hit), then return the lowest-latency ones.
//! Flow: each round samples one random host from every segment, probes them
//! concurrently, and keeps the ones that pass the filters — repeating until
//! `count` valid IPs are gathered (or the `max_probe` budget is hit), then
//! returns the lowest-latency ones.
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::{Duration, Instant};
@@ -21,7 +19,6 @@ use anyhow::Result;
use futures::stream::{self, StreamExt};
use ipnet::IpNet;
use rand::Rng;
use rand::seq::SliceRandom;
use tokio::net::TcpStream;
use tokio::time::timeout;
@@ -38,9 +35,23 @@ pub struct PingResult {
/// module free of any console/progress-bar dependency.
pub enum Progress {
/// A single IP just finished probing (emitted for every probe, live feed).
Probing { ip: IpAddr, ok: bool, valid: usize, probed: usize },
/// Availability pre-scan finished.
PreScan { live: usize, segments: usize, valid: usize },
Probing {
ip: IpAddr,
status: ProbeStatus,
/// Average latency, present when the IP was reachable at all.
latency: Option<Duration>,
valid: usize,
probed: usize,
},
}
/// Why an IP did (or didn't) pass the filters.
pub enum ProbeStatus {
Ok,
Unreachable,
TooFast,
TooSlow,
Lossy,
}
/// Parameters for the ping stage.
@@ -55,8 +66,6 @@ pub struct PingConfig {
pub ipv6: bool,
/// TCP port to probe.
pub port: u16,
/// IPs sampled per segment during the availability pre-scan.
pub probe_sample: usize,
/// Hard cap on how many IPs we probe before giving up.
pub max_probe: usize,
/// TCP probes per IP. >1 makes loss filtering meaningful.
@@ -76,21 +85,28 @@ struct ProbeOutcome {
}
impl PingConfig {
/// Whether an outcome passes the latency-range and loss filters.
fn passes(&self, o: &ProbeOutcome) -> bool {
/// Classify an outcome against the latency-range and loss filters,
/// returning the first failing reason (or `Ok`).
fn classify(&self, o: &ProbeOutcome) -> ProbeStatus {
if o.received == 0 {
return false;
return ProbeStatus::Unreachable;
}
let loss = (self.times - o.received) as f64 / self.times as f64;
loss <= self.max_loss
&& o.avg_latency >= self.min_latency
&& o.avg_latency <= self.max_latency
if loss > self.max_loss {
ProbeStatus::Lossy
} else if o.avg_latency < self.min_latency {
ProbeStatus::TooFast
} else if o.avg_latency > self.max_latency {
ProbeStatus::TooSlow
} else {
ProbeStatus::Ok
}
}
}
/// Run the ping stage and return collected valid IPs, lowest latency first.
///
/// `on_progress` is invoked between probe batches with [`Progress`] events.
/// `on_progress` is invoked for every probe with a [`Progress`] event.
pub async fn run(
ranges: &CfRanges,
cfg: &PingConfig,
@@ -104,95 +120,46 @@ pub async fn run(
return Ok(Vec::new());
}
let mut seen: HashSet<IpAddr> = HashSet::new();
let mut collected: Vec<PingResult> = Vec::new();
// Phase A: availability pre-scan, learn which segments are live.
let mut a_samples: Vec<(usize, IpAddr)> = Vec::new();
{
let mut rng = rand::thread_rng();
for (idx, net) in nets.iter().enumerate() {
for _ in 0..cfg.probe_sample {
let ip = random_host(net, &mut rng);
if seen.insert(ip) {
a_samples.push((idx, ip));
}
}
}
}
let a_ips: Vec<IpAddr> = a_samples.iter().map(|(_, ip)| *ip).collect();
let ip_to_idx: HashMap<IpAddr, usize> = a_samples.iter().map(|(i, ip)| (*ip, *i)).collect();
let mut probed = 0usize;
// A segment is "live" if any sampled IP was reachable at all; an IP joins
// the results only if it also passes the latency/loss filters.
let mut live_idx: HashSet<usize> = HashSet::new();
probe_each(a_ips, cfg.port, cfg.times, cfg.timeout, cfg.concurrency, |ip, outcome| {
probed += 1;
if outcome.received > 0 {
if let Some(idx) = ip_to_idx.get(&ip) {
live_idx.insert(*idx);
let (port, times, to, concurrency) = (cfg.port, cfg.times, cfg.timeout, cfg.concurrency.max(1));
// Lazily generate fresh random hosts, round-robin across all segments. A
// continuous stream (vs discrete rounds) keeps `concurrency` probes always
// in flight, so a slow unreachable IP only blocks its own slot.
let mut seen: HashSet<IpAddr> = HashSet::new();
let mut rng = rand::thread_rng();
let mut idx = 0usize;
let candidates = std::iter::from_fn(move || {
let max_tries = nets.len().saturating_mul(50).max(1);
for _ in 0..max_tries {
let net = &nets[idx % nets.len()];
idx = idx.wrapping_add(1);
let ip = random_host(net, &mut rng);
if seen.insert(ip) {
return Some(ip);
}
}
let ok = cfg.passes(&outcome);
if ok {
collected.push(PingResult { ip, latency: outcome.avg_latency });
}
on_progress(Progress::Probing { ip, ok, valid: collected.len(), probed });
})
.await;
let live_nets: Vec<IpNet> = nets
.iter()
.enumerate()
.filter(|(idx, _)| live_idx.contains(idx))
.map(|(_, net)| *net)
.collect();
on_progress(Progress::PreScan {
live: live_nets.len(),
segments: nets.len(),
valid: collected.len(),
None // every segment effectively exhausted
});
if live_nets.is_empty() {
return Ok(finalize(collected, cfg.count));
}
let mut probes = stream::iter(
candidates.map(|ip| async move { (ip, probe_ip(ip, port, times, to).await) }),
)
.buffer_unordered(concurrency);
// Phase B: collect until we have `count` valid IPs (or run out of budget).
while collected.len() < cfg.count && probed < cfg.max_probe {
let want = cfg.count - collected.len();
let budget = cfg.max_probe - probed;
let batch_size = (want * 2).max(cfg.concurrency).min(budget);
let batch = {
let mut rng = rand::thread_rng();
let mut cand: Vec<IpAddr> = Vec::with_capacity(batch_size);
let mut tries = 0;
let max_tries = batch_size.saturating_mul(20).max(1);
while cand.len() < batch_size && tries < max_tries {
if let Some(net) = live_nets.choose(&mut rng) {
let ip = random_host(net, &mut rng);
if seen.insert(ip) {
cand.push(ip);
}
}
tries += 1;
}
cand
};
if batch.is_empty() {
while let Some((ip, outcome)) = probes.next().await {
probed += 1;
let status = cfg.classify(&outcome);
let latency = (outcome.received > 0).then_some(outcome.avg_latency);
if matches!(status, ProbeStatus::Ok) {
collected.push(PingResult { ip, latency: outcome.avg_latency });
}
on_progress(Progress::Probing { ip, status, latency, valid: collected.len(), probed });
if collected.len() >= cfg.count || probed >= cfg.max_probe {
break;
}
probe_each(batch, cfg.port, cfg.times, cfg.timeout, cfg.concurrency, |ip, outcome| {
probed += 1;
let ok = cfg.passes(&outcome);
if ok {
collected.push(PingResult { ip, latency: outcome.avg_latency });
}
on_progress(Progress::Probing { ip, ok, valid: collected.len(), probed });
})
.await;
}
Ok(finalize(collected, cfg.count))
@@ -205,25 +172,6 @@ fn finalize(mut results: Vec<PingResult>, count: usize) -> Vec<PingResult> {
results
}
/// Probe a batch of IPs with bounded concurrency, invoking `on_each` with each
/// IP's aggregated outcome as soon as it finishes (drives the live feed).
async fn probe_each(
ips: Vec<IpAddr>,
port: u16,
times: usize,
to: Duration,
concurrency: usize,
mut on_each: impl FnMut(IpAddr, ProbeOutcome),
) {
let mut probes = stream::iter(
ips.into_iter().map(|ip| async move { (ip, probe_ip(ip, port, times, to).await) }),
)
.buffer_unordered(concurrency.max(1));
while let Some((ip, outcome)) = probes.next().await {
on_each(ip, outcome);
}
}
/// Probe one IP `times` times, returning how many succeeded and the average
/// latency over the successful probes.
async fn probe_ip(ip: IpAddr, port: u16, times: usize, to: Duration) -> ProbeOutcome {
-8
View File
@@ -84,14 +84,6 @@ impl LiveProgress {
}
}
pub(crate) fn set_position(&self, position: u64) {
self.bar.set_position(position);
}
pub(crate) fn println(&self, msg: String) {
self.bar.println(msg);
}
pub(crate) fn finish(self) {
self.bar.finish_and_clear();
for pb in &self.logs {