feat: init

This commit is contained in:
2026-06-23 13:09:37 +08:00
Unverified
commit 848836fb5f
9 changed files with 1538 additions and 0 deletions
+34
View File
@@ -0,0 +1,34 @@
# Python
__pycache__/
*.py[cod]
*.pyo
*.pyd
.python-version
# Virtual environments
.venv/
venv/
env/
# Build and packaging
build/
dist/
*.egg-info/
# Local tools and binaries
tools/
*.exe
# Runtime outputs
txt/auto_01_ping_ips.txt
txt/auto_02_latency.txt
txt/available_cloudflare_ipv4_groups.txt
txt/cfst_result.csv
txt/fast_cloudflare_ipv4_ips.txt
txt/result.txt
txt/xrayst_result.csv
# Logs and local config
*.log
.env
.env.*
+221
View File
@@ -0,0 +1,221 @@
# Cloudflare VLESS IP Tester
一个面向 VLESS + WS + TLS 节点的 Cloudflare IP 筛选与真实链路测速工具。
项目分为两类能力:
- `cfst`:从 Cloudflare IPv4 段中随机抽样,筛出 TCP 可连通 IP。
- `auto` / `xrayst`:基于本地 Xray 内核,将候选 IP 写入 VLESS outbound,测试真实代理链路的延迟和下载速度。
最终输出是可直接导入客户端的 VLESS 节点列表,节点别名格式为 `速度-延迟-ip`,例如 `12.34M-156ms-103.21.244.162`
## Requirements
| 依赖 | 说明 |
| --- | --- |
| Python | `3.13+` |
| uv | Python 项目与依赖管理 |
| Xray | 默认路径 `tools/xray.exe` |
初始化依赖:
```powershell
uv sync
```
查看入口命令:
```powershell
uv run auto
uv run cfst
uv run xrayst
```
## Quick Start
Windows 下推荐直接使用 `run.bat`
```powershell
.\run.bat "vless://你的节点"
```
`run.bat` 只接受一个 VLESS 参数,并固定执行:
- 第一阶段随机抽取 `3000` 个 IP,其他参数使用 `auto` 默认值。
- 第二阶段保留真实延迟前 `200` 个,其他参数使用 `auto` 默认值。
- 第三阶段使用第二阶段全部结果测速,最终保留前 `10` 个,并发 `5`timeout `10s`
完整流程:
```powershell
uv run auto 'vless://你的节点' -r 1000 -p 10 -c 20 --speed-count 50
```
这条命令会执行:
- 随机抽取 `1000` 个 Cloudflare IP。
- 第一阶段用 TCP 443 探测连通性。
- 第二阶段用 Xray 测真实代理延迟,并保留延迟最低的 `10` 个。
- 第三阶段从延迟结果中取前 `50` 个做真实下载测速。
- 最终按速度档位和延迟排序,输出前 `10` 个节点到 `txt/result.txt`
## Pipeline
`auto.py` 支持分阶段运行,前一阶段输出会作为后一阶段输入。
| 阶段 | 命令 | 输入 | 输出 |
| --- | --- | --- | --- |
| `ping` | `uv run auto --stage ping -r 1000 -c 20` | Cloudflare IPv4 段 | `txt/auto_01_ping_ips.txt` |
| `latency` | `uv run auto --stage latency 'vless://你的节点' -p 200 -c 20` | `txt/auto_01_ping_ips.txt` | `txt/auto_02_latency.txt` |
| `speed` | `uv run auto --stage speed 'vless://你的节点' --speed-count 50 -p 10` | `txt/auto_02_latency.txt` | `txt/result.txt` |
### Stage 1: Ping
随机抽取 Cloudflare IP,并测试 TCP 443 是否可连通。
```powershell
uv run auto --stage ping -r 1000 -c 20
```
行为:
- 通过的 IP 会实时追加写入 `txt/auto_01_ping_ips.txt`
- 控制台会实时输出 `OK``FAIL`
- 这一阶段不需要 VLESS 节点。
### Stage 2: Latency
读取第一阶段输出,通过 Xray 临时代理测试真实 VLESS 链路延迟。
```powershell
uv run auto --stage latency 'vless://你的节点' -p 200 -c 20
```
配置:
| 项 | 值 |
| --- | --- |
| 输入文件 | `txt/auto_01_ping_ips.txt` |
| 输出文件 | `txt/auto_02_latency.txt` |
| 测试地址 | `https://www.google.com/generate_204` |
| Timeout | `3s` |
| 排序规则 | 延迟低的优先 |
`-p` 控制这一阶段保留多少个结果。
### Stage 3: Speed
读取第二阶段输出,通过 Xray 临时代理测试真实下载速度。
```powershell
uv run auto --stage speed 'vless://你的节点' --speed-count 50 -p 10 --speed-concurrency 1 --speed-timeout 10
```
配置:
| 项 | 值 |
| --- | --- |
| 输入文件 | `txt/auto_02_latency.txt` |
| 输出文件 | `txt/result.txt` |
| 下载地址 | `https://cachefly.cachefly.net/50mb.test` |
| 实际读取 | `10MB` |
| Timeout | `10s` |
排序规则:
- 速度每 `1 Mbps` 一个档位。
- 速度档位高的优先。
- 同档位按延迟低的优先。
参数关系:
- `--speed-count`:从第二阶段结果中取多少个 IP 做速度测试,`0` 表示全部。
- `--speed-concurrency`:第三阶段速度测试并发数,默认 `1`
- `--speed-timeout`:第三阶段单个下载测速 timeout,默认 `10s`
- `-p`:最终写入 `txt/result.txt` 的节点数量。
## VLESS Rewrite
输入节点示例:
- `vless://uuid@xxxxxxx:443?encryption=none&security=tls&type=ws&path=%2Fpath#name`
测试某个候选 IP 时,工具会生成等价 Xray outbound
| 字段 | 值 |
| --- | --- |
| outbound address | 候选 IP |
| outbound port | `443` |
| TLS SNI | 原始域名 |
| WebSocket Host | 原始域名 |
| WebSocket path | 原始 path |
最终节点示例:
- `vless://uuid@xxxx-ip:443?encryption=none&security=tls&insecure=0&allowInsecure=0&type=ws&host=xxxx&path=%2Fpath#12.34M-156ms-xxxx-ip`
## Commands
### auto
主入口,支持完整流程和分阶段运行。
```powershell
uv run auto [node] [options]
```
常用参数:
| 参数 | 默认值 | 说明 |
| --- | --- | --- |
| `--stage` | `all` | 可选 `all``ping``latency``speed` |
| `-r, --random-count` | `100` | 第一阶段随机抽取的 IP 数量 |
| `-p, --top` | `10` | 当前阶段保留数量;speed 阶段表示最终输出数量 |
| `-c, --concurrency` | `20` | ping 和 latency 阶段并发数 |
| `--speed-count` | `0` | speed 阶段取多少个延迟结果测试速度,`0` 表示全部 |
| `--speed-concurrency` | `1` | speed 阶段并发测速数量 |
| `--speed-timeout` | `10` | speed 阶段单个下载测速 timeout 秒数 |
| `--xray` | `tools/xray.exe` | Xray 内核路径 |
| `--ping-output` | `txt/auto_01_ping_ips.txt` | 第一阶段输出 |
| `--latency-output` | `txt/auto_02_latency.txt` | 第二阶段输出 |
| `-o, --output` | `txt/result.txt` | 最终节点输出 |
### run.bat
Windows 批处理入口,用于固定执行推荐三阶段流程。
```powershell
.\run.bat "vless://你的节点"
```
等价于依次执行:
```powershell
uv run auto --stage ping -r 3000
uv run auto --stage latency "vless://你的节点" -p 200
uv run auto --stage speed "vless://你的节点" --speed-count 0 -p 10 --speed-concurrency 5 --speed-timeout 10
```
### cfst
Cloudflare IPv4 探测工具。
```powershell
uv run cfst --fetch --pick-count 300 -p 20
```
用途:
- 拉取 Cloudflare IPv4 段。
- 随机抽样测试 TCP 或 ICMP 连通性。
- 输出较快 IP 和完整 CSV。
### xrayst
对指定 IP 或 IP 组做真实 Xray 测试。
```powershell
uv run xrayst --node 'vless://你的节点' --ip 103.21.244.162
uv run xrayst --node 'vless://你的节点' --ips 103.21.244.162,104.18.178.45
```
+351
View File
@@ -0,0 +1,351 @@
import argparse
import asyncio
import json
import math
import random
import subprocess
import sys
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import quote, urlsplit, urlunsplit
from rich.console import Console
from rich.table import Table
import cfst
import xrayst
DEFAULT_OUTPUT = Path("txt/result.txt")
PING_OUTPUT = Path("txt/auto_01_ping_ips.txt")
LATENCY_OUTPUT = Path("txt/auto_02_latency.txt")
DOWNLOAD_BYTES = 10_000_000
TARGET_PORT = 443
CF_TIMEOUT_MS = 1000
XRAY_LATENCY_TIMEOUT_S = 3
XRAY_DOWNLOAD_TIMEOUT_S = 10
console = Console()
@dataclass(frozen=True)
class LatencyResult:
ip: str
latency_ms: float
node_url: str
def speed_bucket(speed_mbps: float) -> int:
return math.floor(speed_mbps)
def sort_key(result: xrayst.TestResult) -> tuple[int, float]:
return -speed_bucket(result.speed_mbps), result.latency_ms
def make_alias(result: xrayst.TestResult) -> str:
speed = f"{result.speed_mbps:.2f}M"
latency = f"{result.latency_ms:.0f}ms"
return f"{speed}-{latency}-{result.ip}"
def with_alias(node_url: str, alias: str) -> str:
parts = urlsplit(node_url)
return urlunsplit((parts.scheme, parts.netloc, parts.path, parts.query, quote(alias)))
async def ping_candidates(random_count: int, concurrency: int, output_path: Path) -> list[str]:
networks = cfst.load_networks(cfst.DEFAULT_IP_FILE, "", not cfst.DEFAULT_IP_FILE.exists())
if not networks:
return []
if not cfst.DEFAULT_IP_FILE.exists():
cfst.save_ipv4_file(cfst.DEFAULT_IP_FILE, networks)
candidates = cfst.random_ips_from_networks(networks, random_count)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text("", encoding="utf-8")
semaphore = asyncio.Semaphore(concurrency)
async def probe_one(ip: str) -> cfst.ProbeResult:
return await cfst.probe_ip(ip, "tcp", TARGET_PORT, 1, CF_TIMEOUT_MS, semaphore)
tasks = [asyncio.create_task(probe_one(ip)) for ip in candidates]
passed: list[str] = []
with output_path.open("a", encoding="utf-8") as fp:
for task in asyncio.as_completed(tasks):
result = await task
if result.received > 0:
passed.append(result.ip)
fp.write(result.ip + "\n")
fp.flush()
console.print(f"OK {result.ip:<15}")
else:
console.print(f"[dim]FAIL {result.ip:<15}[/dim]")
return passed
def run_xray_proxy_measure(
node: xrayst.VlessNode,
ip: str,
xray_path: Path,
measure,
):
local_port = xrayst.pick_free_port()
node_url = xrayst.rewritten_vless_url(node, ip, TARGET_PORT, False)
config = xrayst.build_xray_config(node, ip, local_port, TARGET_PORT, False)
with tempfile.TemporaryDirectory(prefix="auto_xray_") as tmp:
config_path = Path(tmp) / "config.json"
config_path.write_text(json.dumps(config, ensure_ascii=False, indent=2), encoding="utf-8")
process = subprocess.Popen(
[str(xray_path), "run", "-config", str(config_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
try:
if not xrayst.wait_port(local_port, timeout_s=5):
raise RuntimeError("xray local proxy did not start")
return node_url, measure(local_port)
finally:
process.terminate()
try:
process.wait(timeout=3)
except subprocess.TimeoutExpired:
process.kill()
def test_real_latency(
node: xrayst.VlessNode,
ips: list[str],
xray_path: Path,
keep_count: int,
output_path: Path,
concurrency: int,
) -> list[LatencyResult]:
results: list[LatencyResult] = []
with ThreadPoolExecutor(max_workers=max(1, concurrency)) as executor:
futures = {
executor.submit(
run_xray_proxy_measure,
node,
ip,
xray_path,
lambda port: xrayst.measure_latency(port, xrayst.DEFAULT_LATENCY_URL, XRAY_LATENCY_TIMEOUT_S),
): ip
for ip in ips
}
for future in as_completed(futures):
ip = futures[future]
try:
node_url, latency_ms = future.result()
item = LatencyResult(ip=ip, latency_ms=latency_ms, node_url=node_url)
results.append(item)
console.print(f"{ip:<15} {latency_ms:>8.2f} ms")
except Exception as exc:
console.print(f"[dim]FAIL {ip:<15} {exc}[/dim]")
selected = sorted(results, key=lambda item: item.latency_ms)[:keep_count]
write_latency_results(output_path, selected)
return selected
def test_real_speed(
latency_results: list[LatencyResult],
xray_path: Path,
node: xrayst.VlessNode,
keep_count: int,
timeout_s: float,
concurrency: int,
) -> list[xrayst.TestResult]:
results: list[xrayst.TestResult] = []
def test_one(latency_result: LatencyResult) -> xrayst.TestResult:
node_url, speed_data = run_xray_proxy_measure(
node,
latency_result.ip,
xray_path,
lambda port: xrayst.measure_download(
port,
xrayst.DEFAULT_DOWNLOAD_URL,
timeout_s,
DOWNLOAD_BYTES,
),
)
speed_mbps, bytes_read, elapsed_s = speed_data
return xrayst.TestResult(
ip=latency_result.ip,
ok=True,
latency_ms=latency_result.latency_ms,
speed_mbps=speed_mbps,
bytes_read=bytes_read,
elapsed_s=elapsed_s,
node_url=node_url,
)
with ThreadPoolExecutor(max_workers=max(1, concurrency)) as executor:
futures = {executor.submit(test_one, item): item for item in latency_results}
for future in as_completed(futures):
latency_result = futures[future]
try:
item = future.result()
results.append(item)
console.print(
f"{item.ip:<15} {item.latency_ms:>8.2f} ms "
f"{item.speed_mbps:>8.2f} Mbps"
)
except Exception as exc:
console.print(f"[dim]FAIL {latency_result.ip:<15} {exc}[/dim]")
return sorted(results, key=sort_key)[:keep_count]
def write_ip_list(path: Path, ips: list[str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(ips) + ("\n" if ips else ""), encoding="utf-8")
def read_ip_list(path: Path) -> list[str]:
if not path.exists():
return []
return [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()]
def write_latency_results(path: Path, results: list[LatencyResult]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
lines = [f"{item.ip}\t{item.latency_ms:.2f}\t{item.node_url}" for item in results]
path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
def read_latency_results(path: Path) -> list[LatencyResult]:
if not path.exists():
return []
results: list[LatencyResult] = []
for line in path.read_text(encoding="utf-8").splitlines():
parts = line.split("\t", maxsplit=2)
if len(parts) != 3:
continue
ip, latency_text, node_url = parts
try:
latency_ms = float(latency_text)
except ValueError:
continue
results.append(LatencyResult(ip=ip, latency_ms=latency_ms, node_url=node_url))
return results
def write_nodes(path: Path, results: list[xrayst.TestResult]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
nodes = [with_alias(result.node_url, make_alias(result)) for result in results]
path.write_text("\n".join(nodes) + ("\n" if nodes else ""), encoding="utf-8")
def print_results(results: list[xrayst.TestResult]) -> None:
table = Table(title=f"Final Top {len(results)}")
table.add_column("IP")
table.add_column("Latency", justify="right")
table.add_column("Speed", justify="right")
table.add_column("Bucket", justify="right")
for result in results:
table.add_row(
result.ip,
f"{result.latency_ms:.2f} ms",
f"{result.speed_mbps:.2f} Mbps",
f"{speed_bucket(result.speed_mbps)}M",
)
console.print(table)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="One-command Cloudflare IP + xray real speed tester.")
parser.add_argument("node", nargs="?", help="Input vless:// node URL. Required for latency, speed, and all stages.")
parser.add_argument("--stage", choices=["all", "ping", "latency", "speed"], default="all", help="Run one stage or the full pipeline.")
parser.add_argument("-r", "--random-count", type=int, default=100, help="Random candidate IP count.")
parser.add_argument("-p", "--top", type=int, default=10, help="Final node count to keep.")
parser.add_argument("--speed-count", type=int, default=0, help="How many latency results to speed-test. 0 means all.")
parser.add_argument("-c", "--concurrency", type=int, default=20, help="Concurrent probes for ping and latency stages.")
parser.add_argument("--speed-concurrency", type=int, default=1, help="Concurrent speed tests.")
parser.add_argument("--speed-timeout", type=float, default=XRAY_DOWNLOAD_TIMEOUT_S, help="Download speed test timeout seconds.")
parser.add_argument("--xray", type=Path, default=xrayst.DEFAULT_XRAY, help="Path to xray executable.")
parser.add_argument("--ping-output", type=Path, default=PING_OUTPUT, help="Stage 1 output / stage 2 input IP file.")
parser.add_argument("--latency-output", type=Path, default=LATENCY_OUTPUT, help="Stage 2 output / stage 3 input latency file.")
parser.add_argument("-o", "--output", type=Path, default=DEFAULT_OUTPUT, help="Output txt path.")
if len(sys.argv) == 1:
parser.print_help()
raise SystemExit(0)
return parser.parse_args()
async def main() -> int:
args = parse_args()
random.seed(time.time_ns())
needs_node = args.stage in {"all", "latency", "speed"}
if needs_node and not args.node:
console.print("[red]VLESS node is required for this stage.[/red]")
return 1
if needs_node and not args.xray.exists():
console.print(f"[red]xray not found:[/red] {args.xray}")
return 1
node: xrayst.VlessNode | None = None
if needs_node:
try:
node = xrayst.parse_vless(args.node)
except ValueError as exc:
console.print(f"[red]{exc}[/red]")
return 1
if args.stage in {"all", "ping"}:
console.print(f"Random candidate count: [cyan]{args.random_count}[/cyan]")
if args.stage in {"all", "latency", "speed"}:
console.print(f"Final keep count: [cyan]{args.top}[/cyan]")
if args.stage in {"all", "ping"}:
console.print("1/3 TCP probing random IPs...")
ips = await ping_candidates(args.random_count, args.concurrency, args.ping_output)
if not ips:
console.print("[red]No IP passed the TCP probe.[/red]")
return 1
console.print(f"Passed IPs: [cyan]{len(ips)}[/cyan], saved: [green]{args.ping_output}[/green]")
if args.stage == "ping":
return 0
if args.stage in {"all", "latency"}:
console.print("2/3 Testing real xray latency...")
ips = read_ip_list(args.ping_output)
if not ips:
console.print(f"[red]No IPs found in {args.ping_output}.[/red]")
return 1
console.print(f"Loaded ping IPs: [cyan]{len(ips)}[/cyan] from [green]{args.ping_output}[/green]")
latency_results = test_real_latency(node, ips, args.xray, args.top, args.latency_output, args.concurrency)
if not latency_results:
console.print("[red]No IP passed the real latency test.[/red]")
return 1
console.print(f"Latency results saved: [green]{args.latency_output}[/green]")
if args.stage == "latency":
return 0
console.print("3/3 Testing real download speed...")
latency_results = read_latency_results(args.latency_output)
if not latency_results:
console.print(f"[red]No latency results found in {args.latency_output}.[/red]")
return 1
if args.speed_count > 0:
latency_results = latency_results[: args.speed_count]
console.print(f"Loaded latency results: [cyan]{len(latency_results)}[/cyan] from [green]{args.latency_output}[/green]")
selected = test_real_speed(latency_results, args.xray, node, args.top, args.speed_timeout, args.speed_concurrency)
write_nodes(args.output, selected)
console.print()
print_results(selected)
console.print(f"Saved nodes: [green]{args.output}[/green]")
return 0 if selected else 1
def cli() -> None:
raise SystemExit(asyncio.run(main()))
if __name__ == "__main__":
cli()
+366
View File
@@ -0,0 +1,366 @@
import argparse
import asyncio
import csv
import ipaddress
import platform
import random
import sys
import time
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from rich.console import Console
from rich.table import Table
CF_IPV4_URL = "https://www.cloudflare.com/ips-v4"
DEFAULT_IP_FILE = Path("txt/cloudflare_cdn_ips.txt")
DEFAULT_AVAILABLE_FILE = Path("txt/available_cloudflare_ipv4_groups.txt")
DEFAULT_FAST_FILE = Path("txt/fast_cloudflare_ipv4_ips.txt")
DEFAULT_CSV_FILE = Path("txt/cfst_result.csv")
console = Console()
@dataclass(frozen=True)
class ProbeResult:
ip: str
sent: int
received: int
avg_latency_ms: float
@property
def loss_rate(self) -> float:
return (self.sent - self.received) / self.sent if self.sent else 1.0
def fetch_cloudflare_ipv4() -> list[ipaddress.IPv4Network]:
request = urllib.request.Request(
CF_IPV4_URL,
headers={"User-Agent": "Mozilla/5.0 cfst-local/0.1"},
)
with urllib.request.urlopen(request, timeout=15) as response:
text = response.read().decode("utf-8")
return parse_network_lines(text.splitlines())
def parse_network_lines(lines: list[str]) -> list[ipaddress.IPv4Network]:
networks: list[ipaddress.IPv4Network] = []
for raw_line in lines:
token = raw_line.strip().split(maxsplit=1)[0] if raw_line.strip() else ""
try:
network = ipaddress.ip_network(token, strict=False)
except ValueError:
continue
if isinstance(network, ipaddress.IPv4Network):
networks.append(network)
return networks
def load_networks(ip_file: Path, ip_text: str, fetch: bool) -> list[ipaddress.IPv4Network]:
if ip_text:
return parse_network_lines([item.strip() for item in ip_text.split(",")])
if fetch:
return fetch_cloudflare_ipv4()
return parse_network_lines(ip_file.read_text(encoding="utf-8").splitlines())
def save_ipv4_file(path: Path, networks: list[ipaddress.IPv4Network]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
lines = [
"Cloudflare CDN IPv4 ranges",
f"Source: {CF_IPV4_URL}",
"",
*[str(network) for network in networks],
]
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def network_bounds(network: ipaddress.IPv4Network) -> tuple[int, int]:
first = int(network.network_address)
last = int(network.broadcast_address)
if network.num_addresses > 2:
first += 1
last -= 1
return first, last
def random_ips_from_network(network: ipaddress.IPv4Network, count: int) -> list[str]:
first, last = network_bounds(network)
sample_size = min(count, last - first + 1)
values = random.sample(range(first, last + 1), sample_size)
return [str(ipaddress.IPv4Address(value)) for value in values]
def random_ips_from_networks(networks: list[ipaddress.IPv4Network], count: int) -> list[str]:
selected: set[str] = set()
max_attempts = max(count * 20, 100)
attempts = 0
while len(selected) < count and attempts < max_attempts:
network = random.choice(networks)
first, last = network_bounds(network)
selected.add(str(ipaddress.IPv4Address(random.randint(first, last))))
attempts += 1
return list(selected)
def ping_command(ip: str, timeout_ms: int) -> list[str]:
if platform.system().lower() == "windows":
return ["ping", "-n", "1", "-w", str(timeout_ms), ip]
timeout_seconds = max(1, round(timeout_ms / 1000))
return ["ping", "-c", "1", "-W", str(timeout_seconds), ip]
async def icmp_once(ip: str, timeout_ms: int) -> float | None:
started = time.perf_counter()
process = await asyncio.create_subprocess_exec(
*ping_command(ip, timeout_ms),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
ok = await process.wait() == 0
if not ok:
return None
return (time.perf_counter() - started) * 1000
async def tcp_once(ip: str, port: int, timeout_ms: int) -> float | None:
started = time.perf_counter()
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(ip, port),
timeout=timeout_ms / 1000,
)
writer.close()
await writer.wait_closed()
except (OSError, TimeoutError, asyncio.TimeoutError):
return None
return (time.perf_counter() - started) * 1000
async def probe_ip(
ip: str,
mode: str,
port: int,
times: int,
timeout_ms: int,
semaphore: asyncio.Semaphore,
) -> ProbeResult:
received = 0
total_latency = 0.0
async with semaphore:
for _ in range(times):
if mode == "tcp":
latency = await tcp_once(ip, port, timeout_ms)
else:
latency = await icmp_once(ip, timeout_ms)
if latency is not None:
received += 1
total_latency += latency
avg_latency = total_latency / received if received else 0.0
return ProbeResult(ip=ip, sent=times, received=received, avg_latency_ms=avg_latency)
async def probe_ips(
ips: list[str],
mode: str,
port: int,
times: int,
timeout_ms: int,
concurrency: int,
) -> list[ProbeResult]:
semaphore = asyncio.Semaphore(concurrency)
tasks = [
asyncio.create_task(probe_ip(ip, mode, port, times, timeout_ms, semaphore))
for ip in ips
]
return await asyncio.gather(*tasks)
async def find_available_groups(
networks: list[ipaddress.IPv4Network],
sample_count: int,
mode: str,
port: int,
timeout_ms: int,
concurrency: int,
) -> list[tuple[ipaddress.IPv4Network, list[ProbeResult]]]:
group_jobs: list[tuple[ipaddress.IPv4Network, list[str]]] = [
(network, random_ips_from_network(network, sample_count)) for network in networks
]
all_ips = [ip for _, ips in group_jobs for ip in ips]
results = await probe_ips(all_ips, mode, port, 1, timeout_ms, concurrency)
result_map = {result.ip: result for result in results}
return [
(network, [result_map[ip] for ip in ips if result_map[ip].received > 0])
for network, ips in group_jobs
]
def filter_and_sort_results(
results: list[ProbeResult],
min_latency_ms: float,
max_latency_ms: float,
max_loss_rate: float,
) -> list[ProbeResult]:
filtered = [
result
for result in results
if result.received > 0
and min_latency_ms <= result.avg_latency_ms <= max_latency_ms
and result.loss_rate <= max_loss_rate
]
return sorted(filtered, key=lambda item: (item.loss_rate, item.avg_latency_ms))
def write_available_groups(path: Path, groups: list[tuple[ipaddress.IPv4Network, list[ProbeResult]]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
lines = ["Available Cloudflare IPv4 groups", ""]
for network, alive in groups:
if alive:
sample_alive = ",".join(result.ip for result in alive)
lines.append(f"{network} alive={len(alive)} sample_alive={sample_alive}")
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def write_fast_ips(path: Path, results: list[ProbeResult]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
lines = ["Fast Cloudflare IPv4 IPs", ""]
lines.extend(
f"{result.ip} sent={result.sent} received={result.received} "
f"loss={result.loss_rate:.2f} latency_ms={result.avg_latency_ms:.2f}"
for result in results
)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def write_csv(path: Path, results: list[ProbeResult]) -> None:
if str(path).strip() == "":
return
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8-sig") as fp:
writer = csv.writer(fp)
writer.writerow(["IP 地址", "已发送", "已接收", "丢包率", "平均延迟"])
for result in results:
writer.writerow(
[
result.ip,
result.sent,
result.received,
f"{result.loss_rate:.2f}",
f"{result.avg_latency_ms:.2f}",
]
)
def print_table(results: list[ProbeResult], limit: int) -> None:
if limit == 0:
return
selected = results[:limit]
if not selected:
console.print("[yellow]No reachable IP matched the filters.[/yellow]")
return
table = Table(title=f"Top {len(selected)} Fast Cloudflare IPv4")
table.add_column("IP", justify="left")
table.add_column("Sent", justify="right")
table.add_column("Recv", justify="right")
table.add_column("Loss", justify="right")
table.add_column("Avg Latency", justify="right")
for result in selected:
table.add_row(
result.ip,
str(result.sent),
str(result.received),
f"{result.loss_rate:.2f}",
f"{result.avg_latency_ms:.2f} ms",
)
console.print(table)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Single-file Cloudflare IPv4 speed tester.")
parser.add_argument("-n", "--concurrency", type=int, default=200, help="Concurrent probes. Default: 200.")
parser.add_argument("-t", "--times", type=int, default=4, help="Probe times per IP. Default: 4.")
parser.add_argument("-tp", "--port", type=int, default=443, help="TCP probe port. Default: 443.")
parser.add_argument("-p", "--print-count", type=int, default=10, help="Result count to print. Default: 10.")
parser.add_argument("-f", "--file", type=Path, default=DEFAULT_IP_FILE, help="IPv4 CIDR file.")
parser.add_argument("-ip", "--ip-text", default="", help="CIDR/IP list, comma separated.")
parser.add_argument("-o", "--output", type=Path, default=DEFAULT_CSV_FILE, help="CSV output file.")
parser.add_argument("--mode", choices=["tcp", "icmp"], default="tcp", help="Probe mode. Default: tcp.")
parser.add_argument("--fetch", action="store_true", help="Fetch latest Cloudflare IPv4 ranges before testing.")
parser.add_argument("--group-sample", type=int, default=10, help="Random IPs per CIDR for group availability.")
parser.add_argument("--pick-count", type=int, default=100, help="Random IP count from available groups for ranking.")
parser.add_argument("--timeout-ms", type=int, default=1000, help="Timeout per probe in milliseconds.")
parser.add_argument("--tl", type=float, default=9999, help="Maximum average latency in ms.")
parser.add_argument("--tll", type=float, default=0, help="Minimum average latency in ms.")
parser.add_argument("--tlr", type=float, default=1.0, help="Maximum loss rate, 0.0 to 1.0.")
parser.add_argument("--available-output", type=Path, default=DEFAULT_AVAILABLE_FILE)
parser.add_argument("--fast-output", type=Path, default=DEFAULT_FAST_FILE)
if len(sys.argv) == 1:
parser.print_help()
raise SystemExit(0)
return parser.parse_args()
async def main() -> int:
args = parse_args()
random.seed(time.time_ns())
networks = load_networks(args.file, args.ip_text, args.fetch or not args.file.exists())
if not networks:
console.print("[red]No IPv4 CIDR ranges found.[/red]")
return 1
if args.fetch or not args.file.exists():
save_ipv4_file(args.file, networks)
console.print(f"Loaded IPv4 groups: [cyan]{len(networks)}[/cyan]")
console.print(f"Probe mode: [cyan]{args.mode.upper()}[/cyan]" + (f", port: [cyan]{args.port}[/cyan]" if args.mode == "tcp" else ""))
console.print("Checking available groups...")
group_results = await find_available_groups(
networks,
args.group_sample,
args.mode,
args.port,
args.timeout_ms,
args.concurrency,
)
available_networks = [network for network, alive in group_results if alive]
write_available_groups(args.available_output, group_results)
console.print(f"Available groups: [cyan]{len(available_networks)}/{len(networks)}[/cyan]")
if not available_networks:
console.print(f"Saved: {args.available_output}")
return 1
candidates = random_ips_from_networks(available_networks, args.pick_count)
console.print(f"Random picked IPs: [cyan]{len(candidates)}[/cyan]")
console.print("Testing latency...")
tested = await probe_ips(
candidates,
args.mode,
args.port,
args.times,
args.timeout_ms,
args.concurrency,
)
ranked = filter_and_sort_results(tested, args.tll, args.tl, args.tlr)
selected = ranked[: args.print_count if args.print_count > 0 else len(ranked)]
console.print()
print_table(ranked, args.print_count)
write_fast_ips(args.fast_output, selected)
write_csv(args.output, ranked)
console.print()
console.print(f"Saved available groups: [green]{args.available_output}[/green]")
console.print(f"Saved fast IPs: [green]{args.fast_output}[/green]")
console.print(f"Saved CSV: [green]{args.output}[/green]")
return 0
def cli() -> None:
raise SystemExit(asyncio.run(main()))
if __name__ == "__main__":
cli()
+20
View File
@@ -0,0 +1,20 @@
[project]
name = "fast-xray"
version = "0.1.0"
requires-python = ">=3.13"
dependencies = [
"httpx>=0.28.1",
"rich>=15.0.0",
]
[project.scripts]
auto = "auto:cli"
cfst = "cfst:cli"
xrayst = "xrayst:cli"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
only-include = ["auto.py", "cfst.py", "xrayst.py"]
+20
View File
@@ -0,0 +1,20 @@
@echo off
setlocal EnableExtensions EnableDelayedExpansion
if "%~1"=="" (
echo Usage: run.bat "vless://..."
exit /b 1
)
set "NODE=%~1"
uv run auto --stage ping -r 3000
if errorlevel 1 exit /b %errorlevel%
uv run auto --stage latency "!NODE!" -p 200
if errorlevel 1 exit /b %errorlevel%
uv run auto --stage speed "!NODE!" --speed-count 0 -p 10 --speed-concurrency 5 --speed-timeout 10
if errorlevel 1 exit /b %errorlevel%
endlocal
+18
View File
@@ -0,0 +1,18 @@
Cloudflare CDN IPv4 ranges
Source: https://www.cloudflare.com/ips/
173.245.48.0/20
103.21.244.0/22
103.22.200.0/22
103.31.4.0/22
141.101.64.0/18
108.162.192.0/18
190.93.240.0/20
188.114.96.0/20
197.234.240.0/22
198.41.128.0/17
162.158.0.0/15
104.16.0.0/13
104.24.0.0/14
172.64.0.0/13
131.0.72.0/22
Generated
+128
View File
@@ -0,0 +1,128 @@
version = 1
revision = 3
requires-python = ">=3.13"
[[package]]
name = "anyio"
version = "4.14.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "idna" },
]
sdist = { url = "https://files.pythonhosted.org/packages/1c/b5/001890774a9552aff22502b8da382593109ce0c95314abaebbb116567545/anyio-4.14.0.tar.gz", hash = "sha256:b47c1f9ccf73e67021df785332508f99379c68fa7d0684e8e3492cb1d4b23f89", size = 253586, upload-time = "2026-06-15T22:00:49.021Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ba/16/9826f089383c593cdfc4a6e5aca94d9e91ae1692c57af82c3b2aa5e810f7/anyio-4.14.0-py3-none-any.whl", hash = "sha256:dd9b7a2a9799ed6552fde617b2c5df02b7fdd7d88392fc48101e51bae46164d9", size = 123506, upload-time = "2026-06-15T22:00:47.595Z" },
]
[[package]]
name = "certifi"
version = "2026.6.17"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c9/c7/424b75da314c1045981bd9777432fad05a9e0c69daa4ed7e308bbaffe405/certifi-2026.6.17.tar.gz", hash = "sha256:024c88eeec92ca068db80f02b8b07c9cef7b9fe261d1d535abfd5abd6f6af432", size = 134594, upload-time = "2026-06-17T10:31:07.894Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ef/2f/c5464532e965badff2f4c4c1a3a83f5697f0d7c407ed0cda44aaa99bb451/certifi-2026.6.17-py3-none-any.whl", hash = "sha256:2227dcbaafe0d2f59279d1762ddddc37783ed4354594f194ffc31d20f41fc3db", size = 133289, upload-time = "2026-06-17T10:31:06.348Z" },
]
[[package]]
name = "fast-xray"
version = "0.1.0"
source = { editable = "." }
dependencies = [
{ name = "httpx" },
{ name = "rich" },
]
[package.metadata]
requires-dist = [
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "rich", specifier = ">=15.0.0" },
]
[[package]]
name = "h11"
version = "0.16.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250, upload-time = "2025-04-24T03:35:25.427Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" },
]
[[package]]
name = "httpcore"
version = "1.0.9"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" },
]
[[package]]
name = "httpx"
version = "0.28.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "certifi" },
{ name = "httpcore" },
{ name = "idna" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" },
]
[[package]]
name = "idna"
version = "3.18"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/cd/63/9496c57188a2ee585e0f1db071d75089a11e98aa86eb99d9d7618fc1edce/idna-3.18.tar.gz", hash = "sha256:ffb385a7e039654cef1ab9ef32c6fafe283c0c0467bba1d9029738ce4a14a848", size = 196711, upload-time = "2026-06-02T14:34:07.794Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/5e/d4e9f1a599fb8e573b7b87160658329fbf28d19eac2718f51fc3def3aa5a/idna-3.18-py3-none-any.whl", hash = "sha256:7f952cbe720b688055e3f87de14f5c3e5fdaa8bc3928985c4077ca689de849a2", size = 65455, upload-time = "2026-06-02T14:34:06.319Z" },
]
[[package]]
name = "markdown-it-py"
version = "4.2.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mdurl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/06/ff/7841249c247aa650a76b9ee4bbaeae59370dc8bfd2f6c01f3630c35eb134/markdown_it_py-4.2.0.tar.gz", hash = "sha256:04a21681d6fbb623de53f6f364d352309d4094dd4194040a10fd51833e418d49", size = 82454, upload-time = "2026-05-07T12:08:28.36Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/81/4da04ced5a082363ecfa159c010d200ecbd959ae410c10c0264a38cac0f5/markdown_it_py-4.2.0-py3-none-any.whl", hash = "sha256:9f7ebbcd14fe59494226453aed97c1070d83f8d24b6fc3a3bcf9a38092641c4a", size = 91687, upload-time = "2026-05-07T12:08:27.182Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" },
]
[[package]]
name = "pygments"
version = "2.20.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c3/b2/bc9c9196916376152d655522fdcebac55e66de6603a76a02bca1b6414f6c/pygments-2.20.0.tar.gz", hash = "sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f", size = 4955991, upload-time = "2026-03-29T13:29:33.898Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" },
]
[[package]]
name = "rich"
version = "15.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c0/8f/0722ca900cc807c13a6a0c696dacf35430f72e0ec571c4275d2371fca3e9/rich-15.0.0.tar.gz", hash = "sha256:edd07a4824c6b40189fb7ac9bc4c52536e9780fbbfbddf6f1e2502c31b068c36", size = 230680, upload-time = "2026-04-12T08:24:00.75Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/82/3b/64d4899d73f91ba49a8c18a8ff3f0ea8f1c1d75481760df8c68ef5235bf5/rich-15.0.0-py3-none-any.whl", hash = "sha256:33bd4ef74232fb73fe9279a257718407f169c09b78a87ad3d296f548e27de0bb", size = 310654, upload-time = "2026-04-12T08:24:02.83Z" },
]
+380
View File
@@ -0,0 +1,380 @@
import argparse
import csv
import ipaddress
import json
import re
import socket
import subprocess
import sys
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import parse_qs, quote, urlencode, urlparse
import httpx
from rich.console import Console
from rich.table import Table
DEFAULT_XRAY = Path("tools/xray.exe")
DEFAULT_IP_FILE = Path("txt/fast_cloudflare_ipv4_ips.txt")
DEFAULT_OUTPUT = Path("txt/xrayst_result.csv")
DEFAULT_LATENCY_URL = "https://www.google.com/generate_204"
DEFAULT_DOWNLOAD_URL = "https://cachefly.cachefly.net/50mb.test"
DEFAULT_LATENCY_TIMEOUT_S = 1.0
DEFAULT_DOWNLOAD_TIMEOUT_S = 10.0
IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
console = Console()
@dataclass(frozen=True)
class VlessNode:
uuid: str
original_host: str
original_port: int
remark: str
encryption: str
network: str
path: str
@dataclass(frozen=True)
class TestResult:
ip: str
ok: bool
latency_ms: float
speed_mbps: float
bytes_read: int
elapsed_s: float
node_url: str
error: str = ""
def parse_vless(url: str) -> VlessNode:
parsed = urlparse(url)
if parsed.scheme != "vless":
raise ValueError("Only vless:// nodes are supported.")
query = parse_qs(parsed.query)
if not parsed.username or not parsed.hostname:
raise ValueError("Invalid VLESS URL: missing uuid or host.")
return VlessNode(
uuid=parsed.username,
original_host=parsed.hostname,
original_port=parsed.port or 443,
remark=parsed.fragment,
encryption=query.get("encryption", ["none"])[0],
network=query.get("type", ["ws"])[0],
path=query.get("path", ["/"])[0],
)
def valid_ip(value: str) -> str | None:
try:
return str(ipaddress.ip_address(value.strip()))
except ValueError:
return None
def load_ips(single_ip: str, ip_text: str, ip_file: Path) -> list[str]:
values: list[str] = []
if single_ip:
values.append(single_ip.strip())
elif ip_text:
values.extend(item.strip() for item in ip_text.split(","))
elif ip_file.exists():
values.extend(IP_RE.findall(ip_file.read_text(encoding="utf-8")))
seen: set[str] = set()
ips: list[str] = []
for value in values:
ip = valid_ip(value)
if ip and ip not in seen:
seen.add(ip)
ips.append(ip)
return ips
def pick_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return int(sock.getsockname()[1])
def wait_port(port: int, timeout_s: float) -> bool:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
try:
with socket.create_connection(("127.0.0.1", port), timeout=0.2):
return True
except OSError:
time.sleep(0.05)
return False
def rewritten_vless_url(node: VlessNode, ip: str, target_port: int, allow_insecure: bool) -> str:
query = {
"encryption": node.encryption,
"security": "tls",
"insecure": "1" if allow_insecure else "0",
"allowInsecure": "1" if allow_insecure else "0",
"type": node.network,
"host": node.original_host,
"path": node.path,
}
return f"vless://{node.uuid}@{ip}:{target_port}?{urlencode(query, quote_via=quote)}#{quote(node.remark)}"
def build_xray_config(
node: VlessNode,
ip: str,
local_port: int,
target_port: int,
allow_insecure: bool,
) -> dict:
return {
"log": {"loglevel": "warning"},
"inbounds": [
{
"tag": "http-in",
"listen": "127.0.0.1",
"port": local_port,
"protocol": "http",
"settings": {"timeout": 0},
}
],
"outbounds": [
{
"tag": "proxy",
"protocol": "vless",
"settings": {
"vnext": [
{
"address": ip,
"port": target_port,
"users": [{"id": node.uuid, "encryption": node.encryption}],
}
]
},
"streamSettings": {
"network": node.network,
"security": "tls",
"tlsSettings": {
"serverName": node.original_host,
"allowInsecure": allow_insecure,
},
"wsSettings": {
"path": node.path,
"headers": {"Host": node.original_host},
},
},
}
],
"routing": {
"domainStrategy": "AsIs",
"rules": [{"type": "field", "inboundTag": ["http-in"], "outboundTag": "proxy"}],
},
}
def measure_latency(proxy_port: int, url: str, timeout_s: float) -> float:
proxy = f"http://127.0.0.1:{proxy_port}"
timeout = httpx.Timeout(timeout_s, connect=timeout_s)
started = time.perf_counter()
with httpx.Client(proxy=proxy, timeout=timeout, follow_redirects=False) as client:
response = client.get(url)
response.raise_for_status()
return (time.perf_counter() - started) * 1000
def measure_download(proxy_port: int, url: str, timeout_s: float, limit_bytes: int) -> tuple[float, int, float]:
proxy = f"http://127.0.0.1:{proxy_port}"
timeout = httpx.Timeout(timeout_s, connect=timeout_s)
started = time.perf_counter()
bytes_read = 0
with httpx.Client(proxy=proxy, timeout=timeout, follow_redirects=True) as client:
with client.stream("GET", url) as response:
response.raise_for_status()
for chunk in response.iter_bytes(chunk_size=65536):
if not chunk:
continue
bytes_read += len(chunk)
if bytes_read >= limit_bytes:
break
ended = time.perf_counter()
elapsed_s = max(ended - started, 0.001)
speed_mbps = bytes_read * 8 / elapsed_s / 1_000_000
return speed_mbps, bytes_read, elapsed_s
def test_ip(
xray: Path,
node: VlessNode,
ip: str,
target_port: int,
allow_insecure: bool,
latency_url: str,
download_url: str,
latency_timeout_s: float,
download_timeout_s: float,
limit_bytes: int,
) -> TestResult:
node_url = rewritten_vless_url(node, ip, target_port, allow_insecure)
local_port = pick_free_port()
config = build_xray_config(node, ip, local_port, target_port, allow_insecure)
with tempfile.TemporaryDirectory(prefix="xrayst_") as tmp:
config_path = Path(tmp) / "config.json"
config_path.write_text(json.dumps(config, ensure_ascii=False, indent=2), encoding="utf-8")
process = subprocess.Popen(
[str(xray), "run", "-config", str(config_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
try:
if not wait_port(local_port, timeout_s=5):
return TestResult(ip, False, 0, 0, 0, 0, node_url, "xray local proxy did not start")
latency_ms = measure_latency(local_port, latency_url, latency_timeout_s)
speed_mbps, bytes_read, elapsed_s = measure_download(
local_port,
download_url,
download_timeout_s,
limit_bytes,
)
return TestResult(ip, True, latency_ms, speed_mbps, bytes_read, elapsed_s, node_url)
except Exception as exc:
return TestResult(ip, False, 0, 0, 0, 0, node_url, str(exc))
finally:
process.terminate()
try:
process.wait(timeout=3)
except subprocess.TimeoutExpired:
process.kill()
def write_csv(path: Path, results: list[TestResult]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8-sig") as fp:
writer = csv.writer(fp)
writer.writerow(["IP", "OK", "Latency(ms)", "Speed(Mbps)", "Bytes", "Elapsed(s)", "Node", "Error"])
for item in results:
writer.writerow(
[
item.ip,
"1" if item.ok else "0",
f"{item.latency_ms:.2f}",
f"{item.speed_mbps:.2f}",
item.bytes_read,
f"{item.elapsed_s:.2f}",
item.node_url,
item.error,
]
)
def print_results(results: list[TestResult], limit: int) -> None:
table = Table(title=f"Xray Real Speed Test Top {min(limit, len(results))}")
table.add_column("IP")
table.add_column("Latency", justify="right")
table.add_column("Speed", justify="right")
table.add_column("Bytes", justify="right")
table.add_column("Node")
for item in results[:limit]:
table.add_row(
item.ip,
f"{item.latency_ms:.2f} ms",
f"{item.speed_mbps:.2f} Mbps",
str(item.bytes_read),
item.node_url,
)
console.print(table)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Test real VLESS node latency and 10MB download speed with xray.")
parser.add_argument("--xray", type=Path, default=DEFAULT_XRAY, help="Path to xray executable.")
parser.add_argument("--node", required=True, help="Input vless:// node URL.")
parser.add_argument("--ip", default="", help="Single candidate IP.")
parser.add_argument("--ips", default="", help="Candidate IP group, comma separated.")
parser.add_argument("--ip-file", type=Path, default=DEFAULT_IP_FILE, help="Candidate IP file.")
parser.add_argument("-o", "--output", type=Path, default=DEFAULT_OUTPUT, help="CSV output path.")
parser.add_argument("-n", "--concurrency", type=int, default=3, help="Concurrent xray processes.")
parser.add_argument("-p", "--print-count", type=int, default=10, help="Result count to print.")
parser.add_argument("--target-port", type=int, default=443, help="Remote VLESS port to test.")
parser.add_argument("--allow-insecure", action="store_true", help="Allow insecure TLS certificate.")
parser.add_argument("--latency-url", default=DEFAULT_LATENCY_URL, help="Real link latency test URL.")
parser.add_argument("--download-url", default=DEFAULT_DOWNLOAD_URL, help="Download speed test URL.")
parser.add_argument("--bytes", type=int, default=10_000_000, help="Download byte limit. Default: 10MB.")
parser.add_argument("--latency-timeout", type=float, default=DEFAULT_LATENCY_TIMEOUT_S, help="Latency request timeout seconds.")
parser.add_argument("--download-timeout", type=float, default=DEFAULT_DOWNLOAD_TIMEOUT_S, help="Download request timeout seconds.")
if len(sys.argv) == 1:
parser.print_help()
raise SystemExit(0)
return parser.parse_args()
def main() -> int:
args = parse_args()
if not args.xray.exists():
console.print(f"[red]xray not found:[/red] {args.xray}")
return 1
try:
node = parse_vless(args.node)
except ValueError as exc:
console.print(f"[red]{exc}[/red]")
return 1
ips = load_ips(args.ip, args.ips, args.ip_file)
if not ips:
console.print("[red]No valid candidate IPs found.[/red]")
return 1
console.print(f"Candidate IPs: [cyan]{len(ips)}[/cyan]")
console.print(f"Node host: [cyan]{node.original_host}[/cyan], WS path: [cyan]{node.path}[/cyan]")
console.print(f"Latency URL: [cyan]{args.latency_url}[/cyan]")
console.print(f"Download URL: [cyan]{args.download_url}[/cyan]")
console.print(f"Download limit: [cyan]{args.bytes}[/cyan] bytes")
results: list[TestResult] = []
with ThreadPoolExecutor(max_workers=max(1, args.concurrency)) as executor:
futures = [
executor.submit(
test_ip,
args.xray,
node,
ip,
args.target_port,
args.allow_insecure,
args.latency_url,
args.download_url,
args.latency_timeout,
args.download_timeout,
args.bytes,
)
for ip in ips
]
for future in as_completed(futures):
item = future.result()
results.append(item)
status = "OK" if item.ok else "FAIL"
console.print(f"{status:<4} {item.ip:<15} {item.latency_ms:>8.2f} ms {item.speed_mbps:>8.2f} Mbps")
ranked = sorted((item for item in results if item.ok), key=lambda item: (-item.speed_mbps, item.latency_ms))
failed = [item for item in results if not item.ok]
write_csv(args.output, ranked + failed)
console.print()
if ranked:
print_results(ranked, args.print_count)
else:
console.print("[yellow]No IP passed the xray download test.[/yellow]")
console.print(f"Saved CSV: [green]{args.output}[/green]")
return 0 if ranked else 1
def cli() -> None:
raise SystemExit(main())
if __name__ == "__main__":
cli()