12 Commits

42 changed files with 1813 additions and 164 deletions
+31 -1
View File
@@ -37,7 +37,7 @@ flowchart LR
| `data/settings.toml` | 保存配置页设置。 |
| `data/download.toml` | 保存下载页设置。 |
| `data/config.json` | 生成给 Xray 使用的配置。 |
| `data/xray/` | 保存 `xray``geoip.dat``geosite.dat`。 |
| `data/xray/` | 保存 `xray` / `xray.exe``geoip.dat``geosite.dat`。 |
| `data/transparent/` | 保存透明代理脚本、nftables 配置和 `tinytun.yaml`。 |
| `data/xray.log` | 保存 Xray 输出和 pyxray 运行日志。 |
@@ -82,6 +82,35 @@ http://<host-ip>:8080
7. 点击“启动 Xray”。
8. 在“日志”页确认 Xray 和透明代理脚本执行结果。
## CLI
启动 Web 控制台:
```bash
pyxray web --host 127.0.0.1 --port 3309 --xray-dir data/xray
```
查看配置文件:
```bash
pyxray configs --download
pyxray configs --settings
```
清理配置/下载资源:
```bash
pyxray clear --download
pyxray clear --all
```
直接下载或补齐 Xray 资源:
```bash
pyxray download --target all --directory data/xray --force
pyxray download --target geoip --geoip-url https://example.invalid/geoip.dat
```
## 透明代理建议
| 场景 | 建议 |
@@ -133,5 +162,6 @@ http://<host-ip>:8080
| 路由 | [docs/config/routing.md](docs/config/routing.md) |
| DNS | [docs/config/dns.md](docs/config/dns.md) |
| 透明代理 | [docs/config/transparent.md](docs/config/transparent.md) |
| 透明代理 iptables 排查 | [docs/transparent-iptables.md](docs/transparent-iptables.md) |
| 出站和自动更新 | [docs/config/outbounds-auto-update.md](docs/config/outbounds-auto-update.md) |
| Xray 资源下载 | [docs/config/assets.md](docs/config/assets.md) |
View File
+3 -2
View File
@@ -20,10 +20,11 @@ UI 中有些字段隐藏但仍存在于 `settings.toml`。下表的“UI”列
| 分类 | 文档 | 主要影响 |
| --- | --- | --- |
| 核心 | [config/core.md](config/core.md) | Xray 日志、Mux、TCP Fast Open。 |
| 入站 | [config/inbounds.md](config/inbounds.md) | SOCKS/HTTP/rule/API/自定义入站。 |
| 入站 | [config/inbounds.md](config/inbounds.md) | Mixed/rule/API/自定义入站。 |
| 路由 | [config/routing.md](config/routing.md) | rule 入站和透明代理流量的分流策略。 |
| DNS | [config/dns.md](config/dns.md) | Xray DNS 模块、本地 DNS 入站、DNS 服务器路由。 |
| 透明代理 | [config/transparent.md](config/transparent.md) | transparent inbound、iptables/nft、resolv、TinyTun。 |
| 透明代理 iptables 排查 | [transparent-iptables.md](transparent-iptables.md) | 当前 redirect 规则、宿主机查看方式、链和计数器含义。 |
| 出站和自动更新 | [config/outbounds-auto-update.md](config/outbounds-auto-update.md) | 出站组预留字段、自动更新预留字段。 |
| 资源下载 | [config/assets.md](config/assets.md) | `xray``geoip.dat``geosite.dat` 下载设置。 |
@@ -45,7 +46,7 @@ UI 中有些字段隐藏但仍存在于 `settings.toml`。下表的“UI”列
| 设置 | 默认值 | 说明 |
| --- | --- | --- |
| `core.log_level` | `info` | 日常日志等级。 |
| `inbounds.rule_http_port` | `20172` | 规则 HTTP 代理入口。 |
| `inbounds.rule_http_port` | `20172` | 规则 Mixed 代理入口。 |
| `routing.mode` | `whitelist` | 国内/私有直连,其它代理。 |
| `transparent.mode` | `close` | 默认不启用透明代理。 |
| `transparent.type` | `redirect` | 推荐先用 redirect。 |
+4 -2
View File
@@ -5,7 +5,7 @@
| 设置 | UI | 默认值 | 可选值 | 作用 | 什么时候修改 |
| --- | --- | --- | --- | --- | --- |
| `directory` | 下载页 | `data/xray`Docker 中通常为 `/config/xray` | 路径 | Xray 资源保存目录。 | Docker 部署保持 `/config/xray`;本机运行可用默认值。 |
| `version` | 下载页 | `v26.5.9` | Xray release tag | 官方 release 版本。 | 需要固定或升级 Xray 版本时修改。 |
| `version` | 下载页 | 首次打开时优先使用最新 release;获取失败回退 `v26.5.9` | Xray release tag | 官方 release 版本。 | 需要固定或升级 Xray 版本时修改。 |
| `archive_url` | 下载页 | `""` | URL | 自定义 Xray release zip 地址;为空时用官方地址。 | 官方下载慢或使用镜像时修改。 |
| `geoip_url` | 下载页 | `""` | URL | 自定义 `geoip.dat` 下载地址。 | 需要替换 geoip 数据源时修改。 |
| `geosite_url` | 下载页 | `""` | URL | 自定义 `geosite.dat` 下载地址。 | 需要替换 geosite 数据源时修改。 |
@@ -17,7 +17,7 @@
| 文件 | 作用 |
| --- | --- |
| `xray` | Xray 可执行文件。 |
| `xray` / `xray.exe` | Xray 可执行文件;Windows 下为 `xray.exe`,其它平台为 `xray`。 |
| `geoip.dat` | IP 地理库,用于 `geoip:*` 规则。 |
| `geosite.dat` | 域名分类库,用于 `geosite:*` 规则。 |
@@ -26,7 +26,9 @@
| 条件 | 行为 |
| --- | --- |
| `target = all` | 确保三个必需文件都存在。 |
| 首次无 `download.toml` | 尝试在 5 秒内获取 Xray-core 最新 release tag;失败则使用内置回退版本。 |
| `force = false` 且文件存在 | 跳过已有文件。 |
| `force = true` | 覆盖目标文件。 |
| `archive_url` 为空 | 按当前平台选择官方 release zipWindows x64 使用 `Xray-windows-64.zip`Linux x64 使用 `Xray-linux-64.zip`。 |
| `geoip_url` / `geosite_url` 非空 | 对应 dat 文件使用自定义 URL,优先于 release zip 内置版本。 |
| Docker 部署 | 资源仍由 Web 下载页处理,不在 Dockerfile 中下载。 |
+39
View File
@@ -8,9 +8,47 @@
| `mux_enabled` | 间接显示 | `false` | `bool` | 控制主 `proxy` outbound 是否生成 `mux`。UI 通过 `mux_concurrency = 0` 表示关闭。 | 节点支持且希望复用连接时开启;兼容性异常时关闭。 |
| `mux_concurrency` | 显示 | `8` | UI`0/1/2/4/8/16/32/64`;模型:`1-1024` | `mux.concurrency`。UI 选择 `0` 时保存为 `mux_enabled = false` 且并发恢复为 `8`。 | 高并发小连接场景可尝试 `8/16`;不确定用 `0`。 |
| `tcp_fast_open` | 隐藏 | `default` | `default` / `yes` / `no` | 非 `default` 时写入 outbound `streamSettings.sockopt.tcpFastOpen`。 | 只有明确知道系统和网络支持 TFO 时修改。 |
| `transparent.output_bypass_rules` | 显示 | `""` | 每行一条 `tcp/udp/all 目标[:端口]` | 在 transparent 系统规则的本机 `OUTPUT` 链前置 RETURN,避免宿主机进程被透明代理截获。 | easytier、其它 host network 服务需要直连固定 peer 时修改。 |
| `ss_backend` | 隐藏 | `""` | 字符串 | 预留字段,当前不影响 Xray JSON。 | 当前不用改。 |
| `trojan_backend` | 隐藏 | `""` | 字符串 | 预留字段,当前不影响 Xray JSON。 | 当前不用改。 |
## transparent
核心卡片里的 `transparent` 输入框对应 `[transparent].output_bypass_rules`
格式:
```text
tcp 117.72.47.28:33010
all 192.168.0.0/24
udp 198.51.100.10:3478
```
规则含义:
| 写法 | 作用 |
| --- | --- |
| `tcp 117.72.47.28:33010` | 本机 TCP 访问该 IP 和端口时直连,不进入 transparent。 |
| `all 192.168.0.0/24` | 本机访问该网段时直连;redirect 下只生成 TCPtproxy 下生成 TCP 和 UDP。 |
| `udp 198.51.100.10:3478` | tproxy 下本机 UDP 访问该 IP 和端口时直连;redirect 下忽略 UDP。 |
典型场景:
```text
tcp 117.72.47.28:33010
```
用于避免 easytier 这类 `network_mode: host` 服务的 peer 连接被 `nat OUTPUT -> TP_OUT -> REDIRECT` 截获。
生成顺序:
```sh
iptables -t nat -A TP_OUT -p tcp -d 117.72.47.28 --dport 33010 -j RETURN
iptables -t nat -A TP_OUT -j TP_RULE
```
该设置只影响宿主机本机 `OUTPUT` 流量,不影响 Docker 容器透明代理的 `PREROUTING` 流量。
## 生成影响
| 条件 | 生成结果 |
@@ -20,3 +58,4 @@
| `log_level = none` | `log.loglevel = none``access = "none"``error = "none"`。 |
| `mux_enabled = true` | 主代理 outbound 增加 `mux.enabled = true``mux.concurrency`。 |
| `tcp_fast_open != default` | `proxy` / `direct` outbound 增加 `sockopt.tcpFastOpen`。 |
| `transparent.output_bypass_rules` 非空 | 在 `TP_OUT` 跳转 `TP_RULE` 前生成 OUTPUT 绕过 RETURN 规则。 |
+45 -3
View File
@@ -8,8 +8,10 @@
| `port_sharing` | 隐藏 | `false` | `bool` | 为 `true` 时监听地址强制变成 `0.0.0.0`。 | 当前 UI 用 `listen` 控制,通常不改。 |
| `socks_port` | 隐藏 | `20170` | `0-65535` | 普通 SOCKS 入站,流量最终兜底走 `proxy`。UI 保存时写 `0`。 | 需要无规则 SOCKS 入口时手改。 |
| `http_port` | 隐藏 | `20171` | `0-65535` | 普通 HTTP 入站,流量最终兜底走 `proxy`。UI 保存时写 `0`。 | 需要无规则 HTTP 入口时手改。 |
| `rule_socks_port` | 显示 | `0` | `0-65535` | 规则 SOCKS 入站,流量按 `[routing]` 规则分流。UI 显示值会在为 `0` 时回退展示 `socks_port`。 | 应用要用 SOCKS 并希望按规则分流时设置。 |
| `rule_http_port` | 显示 | `20172` | `0-65535` | 规则 HTTP 入站,流量按 `[routing]` 规则分流。 | 浏览器/系统显式 HTTP 代理时使用。 |
| `rule_socks_port` | 隐藏 | `0` | `0-65535` | 旧版规则 SOCKS 入站兼容字段。当前 UI 保存时写 `0`。 | 通常不改。 |
| `rule_http_port` | 显示为 Mixed 端口 | `20172` | `0-65535` | 规则 mixed 入站,同一个端口同时支持 HTTP 和 SOCKS,流量按 `[routing]` 规则分流。 | 浏览器系统代理、CLI 工具显式代理时使用。 |
| `auth_user` | 显示 | `""` | 字符串 | mixed/SOCKS 入站认证用户名。 | 需要给局域网开放代理但不想裸奔时设置。 |
| `auth_password` | 显示 | `""` | 字符串 | mixed/SOCKS 入站认证密码。 | 与 `auth_user` 一起设置;两者都为空时使用 `noauth`。 |
| `vmess_port` | 隐藏 | `0` | `0-65535` | 额外 VMess 入站。`0` 不生成。 | 当前很少需要。 |
| `inbound_sniffing` | 显示 | `http,tls,quic` | `disable` / `http,tls` / `http,tls,quic` | 写入每个支持入站的 `sniffing.destOverride`。 | 域名路由不准时保持开启;兼容性异常时降级或关闭。 |
| `route_only` | 显示 | `false` | `bool` | 写入 `sniffing.routeOnly`。 | 只希望嗅探域名用于路由、不改连接目标时开启。 |
@@ -31,6 +33,46 @@
| 入站 tag | 来源 | 路由行为 |
| --- | --- | --- |
| `socks` / `http` | `socks_port` / `http_port` | 不进入 `[routing]` 模式规则,最终兜底走 `proxy`。 |
| `rule-socks` / `rule-http` | `rule_socks_port` / `rule_http_port` | 进入 `[routing]` 模式规则。 |
| `rule-mixed` | `rule_http_port` | 同一端口支持 HTTP 和 SOCKS进入 `[routing]` 模式规则。 |
| `vmess` | `vmess_port` | 额外 VMess 入站。 |
| `api-in` | `api.port` | 路由到 `api-out`。 |
## Mixed 入站
`rule-mixed` 生成示例:
```json
{
"tag": "rule-mixed",
"listen": "0.0.0.0",
"port": 20172,
"protocol": "mixed",
"settings": {
"auth": "noauth",
"udp": true,
"allowTransparent": false
}
}
```
如果填写用户名和密码:
```json
"settings": {
"auth": "password",
"udp": true,
"allowTransparent": false,
"accounts": [
{"user": "alice", "pass": "secret"}
]
}
```
使用方式:
```sh
curl -x http://10.11.11.100:20172 http://google.com/
curl --socks5-hostname 10.11.11.100:20172 https://google.com/
curl -x http://alice:secret@10.11.11.100:20172 http://google.com/
curl --socks5-hostname alice:secret@10.11.11.100:20172 https://google.com/
```
+74
View File
@@ -0,0 +1,74 @@
# easytier 连接 peer 超时
## 问题原因
`easytier` 使用 `network_mode: host`,它发起的 peer 连接属于宿主机本机流量,会经过 `nat OUTPUT`
`pyxray` 开启 transparent redirect 后,会把宿主机本机 TCP 流量转到 Xray transparent inbound
```sh
iptables -t nat -I OUTPUT -p tcp -j TP_OUT
iptables -t nat -A TP_OUT -j TP_RULE
iptables -t nat -A TP_RULE -p tcp -j REDIRECT --to-ports 52345
```
因此 easytier 访问 peer 时,连接会被改写:
```mermaid
flowchart LR
E[easytier-core<br/>tcp://117.72.47.28:33010] --> O[nat OUTPUT]
O --> TPO[TP_OUT]
TPO --> TPR[TP_RULE]
TPR --> R[REDIRECT :52345]
R --> X[Xray transparent inbound]
```
结果是 easytier 没有直连到自己的 peer,日志表现为:
```text
connecting to peer dst=tcp://117.72.47.28:33010
connect to peer error ... Timeout
```
## 解决方案
在 UI 的“核心 -> transparent”里添加 OUTPUT 绕过规则,让 easytier peer 连接在进入 `TP_RULE` 前直接 `RETURN`
示例:
```text
tcp 117.72.47.28:33010
```
生成后的关键规则:
```sh
iptables -t nat -A TP_OUT -p tcp -d 117.72.47.28 --dport 33010 -j RETURN
iptables -t nat -A TP_OUT -j TP_RULE
```
修复后的流量路径:
```mermaid
flowchart LR
E[easytier-core<br/>tcp://117.72.47.28:33010] --> O[nat OUTPUT]
O --> TPO[TP_OUT]
TPO --> B{match tcp<br/>117.72.47.28:33010}
B -->|yes| D[RETURN<br/>直连 peer]
B -->|no| TPR[TP_RULE]
TPR --> R[REDIRECT :52345]
```
规则格式:
```text
tcp 117.72.47.28:33010
all 192.168.0.0/24
udp 198.51.100.10:3478
```
说明:
- `redirect` 模式只处理 TCP,因此只生成 TCP 绕过规则。
- `tproxy` 模式支持 TCP 和 UDP。
- 规则只作用于宿主机本机 `OUTPUT`,不改变 Docker 容器透明代理的 `PREROUTING` 行为。
+89
View File
@@ -0,0 +1,89 @@
# VLESS Reality Vision 开启 mux 后连接被关闭
## 问题
在节点使用 `VLESS + REALITY + xtls-rprx-vision` 时,开启 `mux` 后,透明代理和本地 HTTP/SOCKS 代理都会出现请求失败。
现象:
```text
curl google.com
curl: (52) Empty reply from server
```
Xray 日志:
```text
common/mux: dispatching request to tcp:google.com:80
proxy/vless/outbound: tunneling request to tcp:v1.mux.cool:9527
common/mux: failed to read metadata > io: read/write on closed pipe
```
关闭 `mux` 后,同一节点恢复正常:
```text
curl http://google.com/ -> HTTP/1.1 301
curl https://google.com/ -> HTTP/2 301
HTTP/SOCKS inbound 测试 -> 正常
```
## 原因
当前失败不在 transparent/iptables,而在 Xray outbound 层。
`mux` 会把多个 TCP 请求封装进一个 Mux.Cool 连接;官方文档说明它用于减少 TCP 握手延迟,默认关闭,并且不用于提升吞吐。`xtls-rprx-vision` 是 VLESS 的 XTLS flow,官方文档说明它在 `TCP + TLS/REALITY` 下会对 TLS 1.3 数据走底层直拷路径。
两者叠加时,业务连接不再按普通 VLESS 请求直接发送,而是先被封装成 `v1.mux.cool` 子连接:
```mermaid
flowchart LR
A[curl / app] --> I[Xray inbound]
I --> R[routing -> proxy]
R --> M[mux<br/>v1.mux.cool]
M --> V[VLESS<br/>flow=xtls-rprx-vision]
V --> T[REALITY/TCP server]
T --> X[server closes pipe]
```
Xray-core 讨论区有同类案例:配置为 `VLESS + REALITY + TCP + xtls-rprx-vision + mux` 时,请求报 `curl: (52) Empty reply from server`,服务端日志出现 `common/mux``closed pipe` 类错误;去掉 `mux` 后恢复。
因此这里的结论是:该节点组合下 `mux``xtls-rprx-vision` 不兼容或服务端不接受 mux 封装后的请求。
## 解决方案
在 UI 的“核心设置”里关闭 `mux`,保存并重启 Xray。
生成配置中不要出现:
```json
"mux": {
"enabled": true,
"concurrency": 4
}
```
修复后路径:
```mermaid
flowchart LR
A[curl / app] --> I[Xray inbound]
I --> R[routing -> proxy]
R --> V[VLESS<br/>flow=xtls-rprx-vision]
V --> T[REALITY/TCP server]
T --> G[google.com / target]
```
验证命令:
```sh
curl -v http://google.com/
curl -vk https://google.com/
curl -v -x http://127.0.0.1:20172 http://google.com/
curl -v --socks5-hostname 127.0.0.1:20170 https://google.com/
```
参考:
- [Project X: Outbound Proxy (Mux, XUDP)](https://xtls.github.io/en/config/outbound.html)
- [Project X: VLESS (XTLS Vision Seed)](https://xtls.github.io/en/config/inbounds/vless.html)
- [XTLS/Xray-core discussion #5481](https://github.com/XTLS/Xray-core/discussions/5481)
+1 -1
View File
@@ -167,7 +167,7 @@ tags = ["google"]
| 问题 | 原因 | 处理 |
| --- | --- | --- |
| 规则没生效 | 入口不是 rule 入站或透明代理入站。 | 使用 `rule_http_port` / `rule_socks_port`,或开启透明代理。 |
| 规则没生效 | 入口不是 rule 入站或透明代理入站。 | 使用 `rule_http_port` 对应的 mixed 端口,或开启透明代理。 |
| 域名规则没命中 | 流量只有 IP,没有域名。 | 开启 sniffing,或改用 `ip(...)` 规则。 |
| IP 规则导致 DNS 查询 | 当前 pyxray 生成 `domainStrategy = "IPOnDemand"`。 | 避免过度使用 IP 规则,或接受 Xray 为路由进行 DNS 解析。 |
| `routing_a` 里的 `default:` 无效 | pyxray 解析器只识别 `domain(...)``ip(...)`。 | 用 `default_rule` 设置兜底。 |
+302
View File
@@ -0,0 +1,302 @@
# 透明代理 iptables 规则说明
本文说明 `transparent.type = "redirect"` 时,pyxray 当前会生成哪些 `iptables` 规则、这些规则是什么意思,以及如何查看宿主机当前是否生效。
## 当前配置
当前 `data/settings.toml` 中透明代理相关配置类似:
```toml
[transparent]
mode = "pac"
type = "redirect"
port = 52345
docker_transparent = true
docker_transparent_cidrs = "172.16.0.0/12"
output_bypass_rules = "all 117.72.47.28"
```
含义:
| 配置 | 含义 |
| --- | --- |
| `mode = "pac"` | 透明代理流量进入 Xray 后,复用 `[routing].mode` 的分流策略。 |
| `type = "redirect"` | 使用 `iptables nat` 表的 `REDIRECT`,主要处理 TCP 流量。 |
| `port = 52345` | 被拦截的 TCP 流量会转发到本机 Xray transparent inbound 端口。 |
| `docker_transparent = true` | 额外处理来自 Docker 网段的容器流量。 |
| `docker_transparent_cidrs = "172.16.0.0/12"` | 只有源地址在该 CIDR 内的 Docker 容器流量会进入 `PREROUTING` 透明代理规则。 |
| `output_bypass_rules = "all 117.72.47.28"` | 宿主机本机访问 `117.72.47.28` 时直接放行,不进入透明代理。redirect 下实际只生成 TCP 绕过规则。 |
## 生成文件
透明代理规则文件生成在:
```text
data/transparent/
```
常用文件:
| 文件 | 作用 |
| --- | --- |
| `transparent-iptables-setup.sh` | 安装 iptables 规则。 |
| `transparent-iptables-cleanup.sh` | 清理 iptables 规则。 |
| `transparent-nft-setup.sh` | nftables 后端安装脚本。 |
| `transparent-nft-cleanup.sh` | nftables 后端清理脚本。 |
| `v2raya.nft` | nftables 后端规则表。 |
| `ip-forward-apply.sh` | 写 `/proc/sys/net/ipv4/ip_forward` 和 IPv6 forwarding。 |
| `resolv-hijack-setup.sh` | DNS 劫持时改写 `/etc/resolv.conf`。 |
| `resolv-hijack-cleanup.sh` | 停止或回滚时恢复 `/etc/resolv.conf`。 |
直接查看生成的 iptables 脚本:
```bash
sed -n '1,220p' data/transparent/transparent-iptables-setup.sh
sed -n '1,220p' data/transparent/transparent-iptables-cleanup.sh
```
如果在 Docker 宿主机上查看挂载目录:
```bash
sed -n '1,220p' ./data/transparent/transparent-iptables-setup.sh
```
## 当前会生成的 redirect 规则
当前 `redirect` 模式使用 `nat` 表,并创建 3 条自定义链:
| 链 | 作用 |
| --- | --- |
| `TP_OUT` | 处理宿主机本机进程发起的 TCP 流量,也就是 `OUTPUT` 流量。 |
| `TP_PRE` | 处理进入宿主机的 TCP 流量,也就是 `PREROUTING` 流量;主要用于 Docker 容器或其它转发流量。 |
| `TP_RULE` | 统一判断哪些目标直连,哪些目标重定向到 Xray。 |
核心结构:
```bash
iptables -t nat -N TP_OUT
iptables -t nat -N TP_PRE
iptables -t nat -N TP_RULE
iptables -t nat -I OUTPUT -p tcp -j TP_OUT
iptables -t nat -I PREROUTING -p tcp -j TP_PRE
iptables -t nat -A TP_PRE -s 172.16.0.0/12 -j TP_RULE
iptables -t nat -A TP_OUT -j TP_RULE
iptables -t nat -A TP_RULE -p tcp -j REDIRECT --to-ports 52345
```
执行路径:
| 流量来源 | 路径 |
| --- | --- |
| 宿主机本机进程访问外部 TCP | `nat OUTPUT -> TP_OUT -> TP_RULE -> REDIRECT :52345` |
| Docker 容器访问外部 TCP,源地址匹配 `172.16.0.0/12` | `nat PREROUTING -> TP_PRE -> TP_RULE -> REDIRECT :52345` |
| Docker 容器源地址不匹配 `docker_transparent_cidrs` | 进入 `TP_PRE` 后不会跳到 `TP_RULE`,不会被 pyxray redirect。 |
| 访问保留地址、内网地址、本机接口地址、绕过目标 | 在 `TP_RULE``RETURN`,不进入 Xray。 |
## TP_RULE 里的 RETURN 是什么意思
`TP_RULE` 前半段是一批 `RETURN` 规则,用来避免把不该代理的流量送进 Xray。
常见规则:
```bash
iptables -t nat -A TP_RULE -d 10.0.0.0/8 -j RETURN
iptables -t nat -A TP_RULE -d 127.0.0.0/8 -j RETURN
iptables -t nat -A TP_RULE -d 172.16.0.0/12 -j RETURN
iptables -t nat -A TP_RULE -d 192.168.0.0/16 -j RETURN
iptables -t nat -A TP_RULE -m mark --mark 0x80/0x80 -j RETURN
iptables -t nat -A TP_RULE -i wg+ -j RETURN
iptables -t nat -A TP_RULE -i ppp+ -j RETURN
```
含义:
| 规则 | 含义 |
| --- | --- |
| `-d 10.0.0.0/8 -j RETURN` | 访问私有网段时直连。 |
| `-d 127.0.0.0/8 -j RETURN` | 访问本机回环地址时直连,避免回环。 |
| `-d 172.16.0.0/12 -j RETURN` | 访问 Docker 或内网私有地址时直连。 |
| `-d 192.168.0.0/16 -j RETURN` | 访问局域网地址时直连。 |
| `-m mark --mark 0x80/0x80 -j RETURN` | 已打过特定标记的流量跳过,避免重复处理。 |
| `-i wg+ -j RETURN` | 从 WireGuard 之类接口进入的流量跳过。 |
| `-i ppp+ -j RETURN` | 从 PPP 之类接口进入的流量跳过。 |
pyxray 还会把宿主机当前 IPv4 地址所在 CIDR 加入 RETURN
```bash
ip -o -4 addr show | awk '{print $4}'
```
这部分用于避免访问宿主机本机地址或本地接口地址时被透明代理截获。
## output_bypass_rules 生成的规则
当前配置:
```toml
output_bypass_rules = "all 117.72.47.28"
```
redirect 模式只处理 TCP,所以会生成:
```bash
iptables -t nat -A TP_OUT -p tcp -d 117.72.47.28 -j RETURN
```
它的位置在 `TP_OUT -> TP_RULE` 之前,含义是:
```text
宿主机本机进程访问 117.72.47.28 的 TCP 流量直接 RETURN,不进入 TP_RULE,也不会 REDIRECT 到 52345。
```
这个规则只影响宿主机本机 `OUTPUT` 流量,不影响 Docker 容器从 `PREROUTING` 进入的流量。
## 如何查看当前生效状态
宿主机直接查看:
```bash
sudo iptables -t nat -S
sudo iptables -t nat -L -n -v
sudo iptables-save -t nat
```
只看 pyxray 透明代理相关链:
```bash
sudo iptables -t nat -S TP_OUT
sudo iptables -t nat -S TP_PRE
sudo iptables -t nat -S TP_RULE
```
在容器里查看:
```bash
docker exec -it pyxray iptables -t nat -S
docker exec -it pyxray iptables -t nat -S TP_OUT
docker exec -it pyxray iptables -t nat -S TP_PRE
docker exec -it pyxray iptables -t nat -S TP_RULE
```
当前 `compose.yaml` 使用:
```yaml
network_mode: host
privileged: true
```
因此容器里执行的 `iptables` 作用在宿主机网络命名空间。正常情况下,宿主机和容器里看到的是同一套规则。
## 看不到规则时怎么判断原因
先确认 pyxray 是否还在运行:
```bash
docker ps | grep pyxray
docker logs pyxray --tail 80
```
查看 pyxray 自己的透明代理执行日志:
```bash
grep 'pyxray transparent' data/xray.log | tail -80
```
如果看到类似:
```text
setup transparent iptables: /bin/sh data/transparent/transparent-iptables-setup.sh
```
说明启动时使用了 iptables 后端。
如果最后看到的是:
```text
cleanup transparent iptables: /bin/sh data/transparent/transparent-iptables-cleanup.sh
```
说明停止或回滚时已经清理过规则,宿主机上可能就看不到 `TP_OUT``TP_PRE``TP_RULE`
如果容器里能看到、宿主机看不到,检查 `iptables` 前端是否一致:
```bash
sudo iptables --version
docker exec pyxray iptables --version
sudo iptables-nft -t nat -S
sudo iptables-legacy -t nat -S
sudo nft list ruleset
```
有些系统同时存在 `iptables-nft``iptables-legacy`。如果宿主机默认命令和容器里的命令使用不同后端,看到的规则可能不一致。
## 如何判断流量是否命中规则
查看计数器:
```bash
sudo iptables -t nat -L TP_OUT -n -v
sudo iptables -t nat -L TP_PRE -n -v
sudo iptables -t nat -L TP_RULE -n -v
```
关注字段:
| 字段 | 含义 |
| --- | --- |
| `pkts` | 命中该规则的数据包数量。 |
| `bytes` | 命中该规则的字节数。 |
| `REDIRECT tcp -- anywhere anywhere redir ports 52345` | 命中后会被转发到 Xray transparent inbound。 |
| `RETURN` | 命中后从当前自定义链返回,不继续走 pyxray 后续规则。 |
测试前可以清零计数器:
```bash
sudo iptables -t nat -Z TP_OUT
sudo iptables -t nat -Z TP_PRE
sudo iptables -t nat -Z TP_RULE
```
然后从宿主机发起一个 TCP 访问,再查看计数器是否增加。
## 清理规则
正常停止 pyxray 时会执行:
```bash
data/transparent/transparent-iptables-cleanup.sh
```
手动清理:
```bash
sudo sh data/transparent/transparent-iptables-cleanup.sh
```
容器里手动清理:
```bash
docker exec -it pyxray sh /config/transparent/transparent-iptables-cleanup.sh
```
清理脚本会删除:
```bash
nat OUTPUT -> TP_OUT
nat PREROUTING -> TP_PRE
TP_OUT
TP_PRE
TP_RULE
```
## 代码位置
| 路径 | 作用 |
| --- | --- |
| `pyxray/libs/xray_config/transparent_rules.py` | 生成 iptables/nftables 脚本。 |
| `pyxray/libs/xray_transparent_runtime.py` | 启动、停止、回滚时执行脚本。 |
| `data/transparent/transparent-iptables-setup.sh` | 当前配置实际生成出的安装脚本。 |
| `data/transparent/transparent-iptables-cleanup.sh` | 当前配置实际生成出的清理脚本。 |
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "pyxray"
version = "1.0.0"
version = "1.0.5"
description = "A lightweight Linux xray control plane."
readme = "README.md"
requires-python = ">=3.14"
+6 -1
View File
@@ -1,5 +1,10 @@
"""pyxray package."""
from importlib.metadata import PackageNotFoundError, version
__all__ = ["__version__"]
__version__ = "0.1.0"
try:
__version__ = version("pyxray")
except PackageNotFoundError:
__version__ = "0.0.0"
+98 -5
View File
@@ -1,30 +1,123 @@
from __future__ import annotations
import argparse
from pathlib import Path
from pyxray.libs.app_data import (
CONFIG_FILES,
clear_all_data,
clear_download_data,
download_xray_assets,
read_config_file,
resolve_app_data_paths,
)
from pyxray.libs.xray_assets import ASSET_TARGETS
from pyxray.web.server import run_web
DEFAULT_HOST = "0.0.0.0"
DEFAULT_PORT = 8000
DEFAULT_XRAY_DIR = "data/xray"
def main(argv: list[str] | None = None) -> None:
"""pyxray 命令行入口。"""
parser = argparse.ArgumentParser(prog="pyxray")
subparsers = parser.add_subparsers(dest="command")
_add_web_parser(subparsers)
_add_clear_parser(subparsers)
_add_configs_parser(subparsers)
_add_download_parser(subparsers)
args = parser.parse_args(argv)
if args.command in (None, "web"):
run_web(args.host, args.port, args.xray_dir)
if args.command is None:
run_web(DEFAULT_HOST, DEFAULT_PORT, DEFAULT_XRAY_DIR)
return
args.func(args)
def _add_web_parser(subparsers: argparse._SubParsersAction) -> None:
"""注册 Web 服务启动参数。"""
parser = subparsers.add_parser("web", help="启动 Web 控制台")
parser.add_argument("--host", default="0.0.0.0", help="监听地址,默认 0.0.0.0")
parser.add_argument("--port", default=8000, type=int, help="监听端口,默认 8000")
parser.add_argument("--xray-dir", default="data/xray", help="Xray 资源目录,默认 data/xray")
parser.add_argument("--host", default=DEFAULT_HOST, help=f"监听地址,默认 {DEFAULT_HOST}")
parser.add_argument("--port", default=DEFAULT_PORT, type=int, help=f"监听端口,默认 {DEFAULT_PORT}")
parser.add_argument("--xray-dir", default=DEFAULT_XRAY_DIR, help=f"Xray 资源目录,默认 {DEFAULT_XRAY_DIR}")
parser.set_defaults(command="web")
parser.set_defaults(func=_run_web_command)
def _add_clear_parser(subparsers: argparse._SubParsersAction) -> None:
parser = subparsers.add_parser("clear", help="清除 pyxray 配置/数据文件")
scope = parser.add_mutually_exclusive_group(required=True)
scope.add_argument("--all", action="store_true", help="清除所有 pyxray 配置/数据文件和已知生成产物")
scope.add_argument("--download", action="store_true", help="清除下载设置和已知 Xray 下载产物")
parser.add_argument("--xray-dir", default=DEFAULT_XRAY_DIR, help=f"Xray 资源目录,默认 {DEFAULT_XRAY_DIR}")
parser.set_defaults(command="clear", func=_run_clear_command)
def _add_configs_parser(subparsers: argparse._SubParsersAction) -> None:
parser = subparsers.add_parser("configs", help="显示指定 pyxray 配置文件内容")
group = parser.add_mutually_exclusive_group(required=True)
for option in CONFIG_FILES:
group.add_argument(f"--{option.replace('_', '-')}", action="store_const", const=option, dest="config_name")
parser.add_argument("--xray-dir", default=DEFAULT_XRAY_DIR, help=f"Xray 资源目录,默认 {DEFAULT_XRAY_DIR}")
parser.set_defaults(command="configs", func=_run_configs_command)
def _add_download_parser(subparsers: argparse._SubParsersAction) -> None:
parser = subparsers.add_parser("download", help="下载或补齐 Xray 运行资源")
parser.add_argument("--target", choices=ASSET_TARGETS, default=None, help="下载目标:all、xray、geoip、geosite")
parser.add_argument("--directory", default=None, help=f"Xray 资源目录,默认读取 download.toml 或 {DEFAULT_XRAY_DIR}")
parser.add_argument("--version", default=None, help="Xray-core release 版本,例如 v26.5.9")
parser.add_argument("--force", action="store_true", default=None, help="覆盖并重新下载已存在文件")
parser.add_argument("--archive-url", default=None, help="自定义 Xray release zip URL")
parser.add_argument("--geoip-url", default=None, help="自定义 geoip.dat URL")
parser.add_argument("--geosite-url", default=None, help="自定义 geosite.dat URL")
parser.add_argument("--proxy-url", default=None, help="下载代理 URL")
parser.set_defaults(command="download", func=_run_download_command)
def _run_web_command(args: argparse.Namespace) -> None:
run_web(args.host, args.port, args.xray_dir)
def _run_clear_command(args: argparse.Namespace) -> None:
paths = resolve_app_data_paths(args.xray_dir)
result = clear_all_data(paths) if args.all else clear_download_data(paths)
for path in result.removed:
print(f"removed {path}")
if not result.removed:
print("nothing removed")
def _run_configs_command(args: argparse.Namespace) -> None:
paths = resolve_app_data_paths(args.xray_dir)
try:
print(read_config_file(paths, args.config_name), end="")
except FileNotFoundError as exc:
raise SystemExit(f"config file not found: {Path(exc.filename)}") from exc
def _run_download_command(args: argparse.Namespace) -> None:
directory = args.directory or DEFAULT_XRAY_DIR
paths = resolve_app_data_paths(directory)
overrides = {
"directory": args.directory,
"version": args.version,
"archive_url": args.archive_url,
"geoip_url": args.geoip_url,
"geosite_url": args.geosite_url,
"proxy_url": args.proxy_url,
"target": args.target,
"force": args.force,
}
assets = download_xray_assets(paths.download_settings, default_directory=directory, overrides=overrides)
print(f"directory: {assets.directory}")
print(f"downloaded: {', '.join(assets.downloaded) if assets.downloaded else 'none'}")
print(f"skipped: {', '.join(assets.skipped) if assets.skipped else 'none'}")
print(f"ready: {assets.ready}")
if __name__ == "__main__":
+149
View File
@@ -0,0 +1,149 @@
from __future__ import annotations
import shutil
from dataclasses import dataclass
from pathlib import Path
from pyxray.libs.xray_asset_settings import XrayAssetSettings, XrayAssetSettingsStore
from pyxray.libs.xray_assets import ensure_xray_assets
CONFIG_FILES = {
"download": "download.toml",
"nodes": "nodes.toml",
"settings": "settings.toml",
"config": "config.json",
"service_state": "service-state.json",
"log": "xray.log",
}
GENERATED_DIRS = ("transparent",)
DOWNLOAD_ASSET_FILES = ("xray", "xray.exe", "geoip.dat", "geosite.dat")
@dataclass(frozen=True, slots=True)
class AppDataPaths:
"""Filesystem layout shared by the Web app and CLI."""
data_dir: Path
xray_dir: Path
@property
def download_settings(self) -> Path:
return self.data_dir / CONFIG_FILES["download"]
@property
def nodes(self) -> Path:
return self.data_dir / CONFIG_FILES["nodes"]
@property
def settings(self) -> Path:
return self.data_dir / CONFIG_FILES["settings"]
@property
def generated_config(self) -> Path:
return self.data_dir / CONFIG_FILES["config"]
@property
def service_state(self) -> Path:
return self.data_dir / CONFIG_FILES["service_state"]
@property
def log(self) -> Path:
return self.data_dir / CONFIG_FILES["log"]
@property
def transparent_dir(self) -> Path:
return self.data_dir / "transparent"
def config_path(self, name: str) -> Path:
try:
return self.data_dir / CONFIG_FILES[name]
except KeyError as exc:
raise ValueError(f"unsupported config name: {name}") from exc
@dataclass(frozen=True, slots=True)
class ClearResult:
removed: tuple[Path, ...]
missing: tuple[Path, ...]
def resolve_app_data_paths(xray_dir: str | Path = "data/xray", *, data_dir: str | Path | None = None) -> AppDataPaths:
"""Resolve the pyxray data directory from the Xray asset directory."""
resolved_xray_dir = Path(xray_dir)
resolved_data_dir = Path(data_dir) if data_dir is not None else default_data_dir(resolved_xray_dir)
return AppDataPaths(data_dir=resolved_data_dir, xray_dir=resolved_xray_dir)
def default_data_dir(xray_dir: str | Path) -> Path:
"""Use data/xray -> data, matching the Web app's default layout."""
path = Path(xray_dir)
return path.parent if path.name == "xray" else path
def clear_download_data(paths: AppDataPaths) -> ClearResult:
"""Remove persisted download settings and known downloaded Xray assets only."""
return _remove_known_paths([paths.download_settings, *_download_asset_paths(paths.xray_dir)])
def clear_all_data(paths: AppDataPaths) -> ClearResult:
"""Remove pyxray-owned config/data files and generated artifacts."""
targets = [
*(paths.data_dir / filename for filename in CONFIG_FILES.values()),
*(paths.data_dir / name for name in GENERATED_DIRS),
*_download_asset_paths(paths.xray_dir),
]
return _remove_known_paths(targets)
def read_config_file(paths: AppDataPaths, name: str) -> str:
path = paths.config_path(name)
if not path.exists():
raise FileNotFoundError(path)
return path.read_text(encoding="utf-8")
def download_xray_assets(settings_path: str | Path, *, default_directory: str | Path, overrides: dict[str, object]):
"""Merge CLI options with download.toml, persist them, and ensure assets exist."""
store = XrayAssetSettingsStore(settings_path, default_directory=default_directory)
current = store.load()
values = current.to_dict()
for key, value in overrides.items():
if value is not None:
values[key] = value
settings = XrayAssetSettings.from_dict(values)
store.save(settings)
return ensure_xray_assets(
settings.directory,
version=settings.version,
archive_url=settings.archive_url or None,
geoip_url=settings.geoip_url or None,
geosite_url=settings.geosite_url or None,
proxy_url=settings.proxy_url or None,
target=settings.target,
force=settings.force,
)
def _download_asset_paths(xray_dir: Path) -> tuple[Path, ...]:
return tuple(xray_dir / name for name in DOWNLOAD_ASSET_FILES)
def _remove_known_paths(paths: list[Path]) -> ClearResult:
removed: list[Path] = []
missing: list[Path] = []
for path in dict.fromkeys(paths):
if not path.exists() and not path.is_symlink():
missing.append(path)
continue
if path.is_dir() and not path.is_symlink():
shutil.rmtree(path)
else:
path.unlink()
removed.append(path)
return ClearResult(removed=tuple(removed), missing=tuple(missing))
+3 -3
View File
@@ -8,7 +8,7 @@ from typing import Any
import tomlkit
from pyxray.libs.xray_assets import DEFAULT_VERSION
from pyxray.libs.xray_assets import default_xray_version
@dataclass(slots=True)
@@ -16,7 +16,7 @@ class XrayAssetSettings:
"""Xray 资源下载页面的可持久化设置。"""
directory: str = "data/xray"
version: str = DEFAULT_VERSION
version: str = ""
archive_url: str = ""
geoip_url: str = ""
geosite_url: str = ""
@@ -51,7 +51,7 @@ class XrayAssetSettingsStore:
def load(self) -> XrayAssetSettings:
if not self.path.exists():
return XrayAssetSettings(directory=self.default_directory)
return XrayAssetSettings(directory=self.default_directory, version=default_xray_version())
values = dict(tomlkit.parse(self.path.read_text(encoding="utf-8")))
if "directory" not in values:
values["directory"] = self.default_directory
+97 -11
View File
@@ -1,7 +1,10 @@
from __future__ import annotations
import os
import platform
import stat
import json
import urllib.parse
import urllib.request
import zipfile
from dataclasses import dataclass
@@ -11,13 +14,17 @@ from typing import Callable
OFFICIAL_RELEASE_BASE = "https://github.com/XTLS/Xray-core/releases/download"
OFFICIAL_LATEST_RELEASE_API = "https://api.github.com/repos/XTLS/Xray-core/releases/latest"
DEFAULT_VERSION = "v26.5.9"
DEFAULT_ARCHIVE_NAME = "Xray-linux-64.zip"
REQUIRED_FILES = ("xray", "geoip.dat", "geosite.dat")
REQUIRED_DATA_FILES = ("geoip.dat", "geosite.dat")
ASSET_TARGETS = ("all", "xray", "geoip", "geosite")
Downloader = Callable[[str], bytes]
DownloadProgress = Callable[[str, int, int | None], None]
VersionFetcher = Callable[[str, float], str]
_DEFAULT_VERSION_CACHE: str | None = None
@dataclass(frozen=True, slots=True)
@@ -58,10 +65,57 @@ class XrayAssetStatus:
return tuple(name for name, exists in self.files.items() if not exists)
def official_archive_url(version: str = DEFAULT_VERSION, archive_name: str = DEFAULT_ARCHIVE_NAME) -> str:
def xray_executable_name(os_name: str | None = None) -> str:
"""返回当前平台的 Xray 可执行文件名。"""
return "xray.exe" if (os_name or os.name) == "nt" else "xray"
def required_files(os_name: str | None = None) -> tuple[str, ...]:
"""返回当前平台运行 Xray 所需的核心文件。"""
return (xray_executable_name(os_name), *REQUIRED_DATA_FILES)
def default_archive_name(os_name: str | None = None, machine: str | None = None) -> str:
"""返回当前平台默认使用的 Xray-core release zip 文件名。"""
resolved_os = os_name or os.name
resolved_machine = (machine or platform.machine()).lower()
is_arm64 = resolved_machine in {"arm64", "aarch64"}
if resolved_os == "nt":
return "Xray-windows-arm64-v8a.zip" if is_arm64 else "Xray-windows-64.zip"
return "Xray-linux-arm64-v8a.zip" if is_arm64 else DEFAULT_ARCHIVE_NAME
def official_archive_url(version: str = DEFAULT_VERSION, archive_name: str | None = None) -> str:
"""返回官方 Xray-core release zip 下载地址。"""
return f"{OFFICIAL_RELEASE_BASE}/{version}/{archive_name}"
return f"{OFFICIAL_RELEASE_BASE}/{version}/{archive_name or default_archive_name()}"
def latest_xray_version(*, timeout: float = 5.0, fetcher: VersionFetcher | None = None) -> str:
"""从 GitHub release API 获取最新 Xray-core 版本。"""
payload = (fetcher or _fetch_url_text)(OFFICIAL_LATEST_RELEASE_API, timeout)
values = json.loads(payload)
tag = str(values.get("tag_name") or "").strip()
if not tag:
raise ValueError("latest Xray release response has no tag_name")
return tag
def default_xray_version(*, timeout: float = 5.0) -> str:
"""返回默认 Xray 版本;优先远程最新版本,失败时回退到内置版本。"""
global _DEFAULT_VERSION_CACHE
if _DEFAULT_VERSION_CACHE:
return _DEFAULT_VERSION_CACHE
try:
_DEFAULT_VERSION_CACHE = latest_xray_version(timeout=timeout)
except Exception: # noqa: BLE001
_DEFAULT_VERSION_CACHE = DEFAULT_VERSION
return _DEFAULT_VERSION_CACHE
def check_xray_assets(directory: str | Path) -> XrayAssetStatus:
@@ -70,7 +124,7 @@ def check_xray_assets(directory: str | Path) -> XrayAssetStatus:
directory = Path(directory)
return XrayAssetStatus(
directory=directory,
files={name: (directory / name).exists() for name in REQUIRED_FILES},
files={name: (directory / name).exists() for name in required_files()},
)
@@ -115,17 +169,18 @@ def ensure_xray_assets(
if target not in ASSET_TARGETS:
raise ValueError(f"unsupported xray asset target: {target}")
xray = directory / "xray"
xray = directory / xray_executable_name()
geoip = directory / "geoip.dat"
geosite = directory / "geosite.dat"
downloaded: list[str] = []
skipped: list[str] = []
requested = _requested_files(target)
xray_name = xray_executable_name()
archive_names = {
name
for name in requested
if name == "xray"
if name == xray_name
or (name == "geoip.dat" and geoip_url is None)
or (name == "geosite.dat" and geosite_url is None)
}
@@ -173,6 +228,11 @@ def download_bytes(url: str, *, proxy_url: str | None = None) -> bytes:
return response.read()
def _fetch_url_text(url: str, timeout: float) -> str:
with urllib.request.urlopen(url, timeout=timeout) as response: # noqa: S310
return response.read().decode("utf-8")
def download_bytes_stream(
url: str,
progress: DownloadProgress,
@@ -202,13 +262,39 @@ def _open_url(url: str, proxy_url: str | None):
"""打开 URL;指定代理时使用该代理,否则走系统默认代理配置。"""
if proxy_url:
opener = urllib.request.build_opener(
urllib.request.ProxyHandler({"http": proxy_url, "https": proxy_url})
)
opener = _proxy_opener(proxy_url)
return opener.open(url)
return urllib.request.urlopen(url) # noqa: S310
def _proxy_opener(proxy_url: str):
parsed = urllib.parse.urlsplit(proxy_url)
if not parsed.username:
return urllib.request.build_opener(urllib.request.ProxyHandler({"http": proxy_url, "https": proxy_url}))
clean_proxy_url = _proxy_url_without_auth(parsed)
password_manager = urllib.request.HTTPPasswordMgrWithDefaultRealm()
password_manager.add_password(
None,
clean_proxy_url,
urllib.parse.unquote(parsed.username),
urllib.parse.unquote(parsed.password or ""),
)
return urllib.request.build_opener(
urllib.request.ProxyHandler({"http": clean_proxy_url, "https": clean_proxy_url}),
urllib.request.ProxyBasicAuthHandler(password_manager),
urllib.request.ProxyDigestAuthHandler(password_manager),
)
def _proxy_url_without_auth(parsed: urllib.parse.SplitResult) -> str:
host = parsed.hostname or ""
if ":" in host and not host.startswith("["):
host = f"[{host}]"
netloc = f"{host}:{parsed.port}" if parsed.port else host
return urllib.parse.urlunsplit((parsed.scheme, netloc, parsed.path, parsed.query, parsed.fragment))
def _extract_from_zip(data: bytes, directory: Path, names: set[str]) -> None:
"""从 zip 中按文件名提取需要的文件,忽略目录层级。"""
@@ -223,9 +309,9 @@ def _requested_files(target: str) -> tuple[str, ...]:
"""把用户选择的下载目标转换为实际文件名。"""
if target == "all":
return REQUIRED_FILES
return required_files()
if target == "xray":
return ("xray",)
return (xray_executable_name(),)
if target == "geoip":
return ("geoip.dat",)
if target == "geosite":
+43 -5
View File
@@ -31,6 +31,8 @@ def generate_xray_config(node: Node, settings: XrayConfigSettings | None = None)
log = _build_log(settings)
if log:
config["log"] = log
if _fakedns_enabled(settings):
config["fakedns"] = [{"ipPool": "198.18.0.0/15", "poolSize": 65535}]
config["routing"]["rules"].extend(_build_dns_routing(settings))
config["routing"]["rules"].append({"type": "field", "inboundTag": _dns_inbound_tags(settings), "outboundTag": "dns-out"})
config["routing"]["rules"].extend(_build_rule_port_routing(settings))
@@ -65,8 +67,7 @@ def _build_inbounds(settings: XrayConfigSettings) -> list[dict[str, Any]]:
inbounds = [
_socks_inbound(settings.inbounds.socks_port, listen, "socks", settings),
_http_inbound(settings.inbounds.http_port, listen, "http", settings),
_socks_inbound(settings.inbounds.rule_socks_port, listen, "rule-socks", settings),
_http_inbound(settings.inbounds.rule_http_port, listen, "rule-http", settings),
_mixed_inbound(settings.inbounds.rule_http_port, listen, "rule-mixed", settings),
_vmess_inbound(settings.inbounds.vmess_port, listen, settings),
]
for custom in settings.inbounds.custom:
@@ -82,7 +83,7 @@ def _build_inbounds(settings: XrayConfigSettings) -> list[dict[str, Any]]:
def _socks_inbound(port: int, listen: str, tag: str, settings: XrayConfigSettings) -> dict[str, Any]:
return _with_sniffing(
{"port": port, "listen": listen, "protocol": "socks", "settings": {"auth": "noauth", "udp": True}, "tag": tag},
{"port": port, "listen": listen, "protocol": "socks", "settings": _inbound_proxy_settings(settings), "tag": tag},
settings,
)
@@ -91,6 +92,19 @@ def _http_inbound(port: int, listen: str, tag: str, settings: XrayConfigSettings
return _with_sniffing({"port": port, "listen": listen, "protocol": "http", "tag": tag}, settings)
def _mixed_inbound(port: int, listen: str, tag: str, settings: XrayConfigSettings) -> dict[str, Any]:
return _with_sniffing(
{
"port": port,
"listen": listen,
"protocol": "mixed",
"settings": _inbound_proxy_settings(settings),
"tag": tag,
},
settings,
)
def _vmess_inbound(port: int, listen: str, settings: XrayConfigSettings) -> dict[str, Any]:
if port <= 0:
return {"port": 0}
@@ -101,6 +115,14 @@ def _vmess_inbound(port: int, listen: str, settings: XrayConfigSettings) -> dict
)
def _inbound_proxy_settings(settings: XrayConfigSettings) -> dict[str, Any]:
proxy_settings: dict[str, Any] = {"auth": "noauth", "udp": True, "allowTransparent": False}
if settings.inbounds.auth_user and settings.inbounds.auth_password:
proxy_settings["auth"] = "password"
proxy_settings["accounts"] = [{"user": settings.inbounds.auth_user, "pass": settings.inbounds.auth_password}]
return proxy_settings
def _transparent_inbounds(settings: XrayConfigSettings) -> list[dict[str, Any]]:
if settings.transparent.mode == "close":
return []
@@ -137,9 +159,12 @@ def _transparent_inbounds(settings: XrayConfigSettings) -> list[dict[str, Any]]:
def _with_sniffing(inbound: dict[str, Any], settings: XrayConfigSettings) -> dict[str, Any]:
if settings.inbounds.inbound_sniffing == "disable":
return inbound
dest_override = settings.inbounds.inbound_sniffing.split(",")
if _fakedns_enabled(settings) and "fakedns" not in dest_override:
dest_override.append("fakedns")
inbound["sniffing"] = {
"enabled": True,
"destOverride": settings.inbounds.inbound_sniffing.split(","),
"destOverride": dest_override,
"domainsExcluded": _split_lines(settings.inbounds.domains_excluded),
"routeOnly": settings.inbounds.route_only,
}
@@ -187,6 +212,9 @@ def _dns_inbound_tags(settings: XrayConfigSettings) -> list[str]:
def _build_dns(settings: XrayConfigSettings, node: Node) -> dict[str, Any]:
servers: list[Any] = []
fakedns_domains = _fakedns_domains(settings)
if fakedns_domains:
servers.append({"address": "fakedns", "domains": fakedns_domains})
routing_domains = _domains_to_lookup(settings, node)
for rule in settings.dns.rules:
domains = _split_lines(rule.domains)
@@ -210,6 +238,16 @@ def _build_dns(settings: XrayConfigSettings, node: Node) -> dict[str, Any]:
return dns
def _fakedns_enabled(settings: XrayConfigSettings) -> bool:
return settings.dns.special_mode == "fakedns"
def _fakedns_domains(settings: XrayConfigSettings) -> list[str]:
if not _fakedns_enabled(settings):
return []
return _split_lines(settings.dns.fakedns_domains) or ["geosite:geolocation-!cn"]
def _dns_server(rule: DnsRuleSettings, domains: list[str]) -> Any:
address, port = _parse_dns_addr(rule.server)
server_address = rule.server if "://" in rule.server else address
@@ -239,7 +277,7 @@ def _build_dns_routing(settings: XrayConfigSettings) -> list[dict[str, Any]]:
def _build_rule_port_routing(settings: XrayConfigSettings) -> list[dict[str, Any]]:
tags = [tag for tag, port in (("rule-http", settings.inbounds.rule_http_port), ("rule-socks", settings.inbounds.rule_socks_port)) if port > 0]
tags = ["rule-mixed"] if settings.inbounds.rule_http_port > 0 else []
if not tags:
return []
mode = settings.routing.mode
+6
View File
@@ -43,6 +43,8 @@ class InboundSettings:
http_port: int = 20171
rule_socks_port: int = 0
rule_http_port: int = 20172
auth_user: str = ""
auth_password: str = ""
vmess_port: int = 0
inbound_sniffing: str = "http,tls,quic"
route_only: bool = False
@@ -83,6 +85,7 @@ class TransparentSettings:
docker_transparent: bool = True
docker_transparent_cidrs: str = "172.16.0.0/12"
tproxy_excluded_interfaces: str = "docker*,veth*,wg*,ppp*,br-*"
output_bypass_rules: str = ""
tproxy_white_country_codes: list[str] = field(default_factory=list)
tproxy_white_custom_ips: list[str] = field(default_factory=list)
tun_bypass_interfaces: str = ""
@@ -123,6 +126,7 @@ class DnsSettings:
)
antipollution: str = "closed"
special_mode: str = "none"
fakedns_domains: str = "geosite:geolocation-!cn"
@dataclass(slots=True)
@@ -201,6 +205,8 @@ def validate_settings(settings: XrayConfigSettings) -> None:
if not 1 <= settings.core.mux_concurrency <= 1024:
raise ValueError("core.mux_concurrency must be between 1 and 1024")
_validate_choice(settings.inbounds.inbound_sniffing, {"disable", "http,tls", "http,tls,quic"}, "inbounds.inbound_sniffing")
if bool(settings.inbounds.auth_user) != bool(settings.inbounds.auth_password):
raise ValueError("inbounds.auth_user and inbounds.auth_password must be set together")
_validate_choice(settings.routing.mode, {"whitelist", "gfwlist", "custom", "routingA", "proxy", "direct", "block"}, "routing.mode")
_validate_choice(settings.routing.default_rule, {"direct", "proxy", "block"}, "routing.default_rule")
_validate_choice(settings.transparent.mode, {"close", "proxy", "whitelist", "gfwlist", "pac"}, "transparent.mode")
@@ -182,6 +182,7 @@ def _redirect_rules(settings: XrayConfigSettings, *, backend: str, ipv6: bool, n
f"iptables -w 2 -t nat -A TP_RULE -p tcp -j REDIRECT --to-ports {settings.transparent.port}",
"iptables -w 2 -t nat -I PREROUTING -p tcp -j TP_PRE",
"iptables -w 2 -t nat -I OUTPUT -p tcp -j TP_OUT",
*_iptables_output_bypass_rules(settings, table="nat", mode="redirect"),
*_redirect_prerouting_jumps(settings),
"iptables -w 2 -t nat -A TP_OUT -j TP_RULE",
]
@@ -237,6 +238,7 @@ def _tproxy_rules(
"iptables -w 2 -t mangle -I OUTPUT -j TP_OUT",
"iptables -w 2 -t mangle -I PREROUTING -j TP_PRE",
"iptables -w 2 -t mangle -A TP_OUT -m mark --mark 0x80/0x80 -j RETURN",
*_iptables_output_bypass_rules(settings, table="mangle", mode="tproxy"),
"iptables -w 2 -t mangle -A TP_OUT -p tcp -m addrtype --src-type LOCAL ! --dst-type LOCAL -j TP_RULE",
"iptables -w 2 -t mangle -A TP_OUT -p udp -m addrtype --src-type LOCAL ! --dst-type LOCAL -j TP_RULE",
"iptables -w 2 -t mangle -A TP_PRE -i lo -m mark ! --mark 0x40/0xc0 -j RETURN",
@@ -367,6 +369,7 @@ def _redirect_nft_table(settings: XrayConfigSettings, *, ipv6: bool) -> str:
chain tp_out {{
type nat hook output priority -105
{_nft_output_bypass_rules(settings, mode="redirect")}
{nfproto} meta l4proto tcp jump tp_rule
}}
}}"""
@@ -394,6 +397,7 @@ def _tproxy_nft_table(settings: XrayConfigSettings, *, ipv6: bool, tproxy_white_
chain tp_out {{
meta mark & 0x80 == 0x80 return
{_nft_output_bypass_rules(settings, mode="tproxy")}
meta l4proto {{ tcp, udp }} fib saddr type local fib daddr type != local jump tp_rule
}}
@@ -585,6 +589,65 @@ def _iptables_interface(value: str) -> str:
return value.replace("*", "+")
def _output_bypass_entries(settings: XrayConfigSettings) -> list[tuple[str, str, int | None]]:
entries: list[tuple[str, str, int | None]] = []
for raw in settings.transparent.output_bypass_rules.replace(",", "\n").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) != 2:
continue
protocol = parts[0].lower()
if protocol not in {"tcp", "udp", "all"}:
continue
target, port = _split_bypass_target(parts[1])
try:
ipaddress.ip_network(target, strict=False)
except ValueError:
continue
entries.append((protocol, target, port))
return entries
def _split_bypass_target(value: str) -> tuple[str, int | None]:
if ":" not in value:
return value, None
host, raw_port = value.rsplit(":", 1)
if not host or not raw_port.isdigit():
return value, None
port = int(raw_port)
if not 1 <= port <= 65535:
return value, None
return host, port
def _iptables_output_bypass_rules(settings: XrayConfigSettings, *, table: str, mode: str) -> list[str]:
lines: list[str] = []
allowed_protocols = {"tcp"} if mode == "redirect" else {"tcp", "udp"}
for protocol, target, port in _output_bypass_entries(settings):
protocols = sorted(allowed_protocols) if protocol == "all" else [protocol]
for item in protocols:
if item not in allowed_protocols:
continue
port_match = f" --dport {port}" if port is not None else ""
lines.append(f"iptables -w 2 -t {table} -A TP_OUT -p {item} -d {target}{port_match} -j RETURN")
return lines
def _nft_output_bypass_rules(settings: XrayConfigSettings, *, mode: str) -> str:
lines: list[str] = []
allowed_protocols = {"tcp"} if mode == "redirect" else {"tcp", "udp"}
for protocol, target, port in _output_bypass_entries(settings):
protocols = sorted(allowed_protocols) if protocol == "all" else [protocol]
for item in protocols:
if item not in allowed_protocols:
continue
port_match = f" th dport {port}" if port is not None else ""
lines.append(f" ip daddr {target} meta l4proto {item}{port_match} return")
return "\n".join(lines)
def _nft_set(name: str, kind: str, values: list[str]) -> str:
elements = ",\n ".join(values)
return f""" set {name} {{
+39 -1
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
import json
import os
import re
import signal
import socket
import subprocess
@@ -218,6 +219,17 @@ def read_log_since(path: str | Path, offset: int) -> tuple[str, int]:
return _tail_lines(content, 1000), size
def compact_xray_log(content: str) -> str:
"""Convert verbose Xray routing logs into concise route decisions."""
entries: list[str] = []
for line in content.splitlines():
entry = _compact_log_line(line)
if entry:
entries.append(entry)
return "\n".join(entries)
def log_file_size(path: str | Path) -> int:
resolved = Path(path)
if not resolved.exists():
@@ -230,6 +242,32 @@ def _tail_lines(content: str, line_count: int) -> str:
return "\n".join(lines[-line_count:])
def _compact_log_line(line: str) -> str | None:
match = _DETOUR_RE.search(line)
if match:
target = _compact_target(match.group("target"))
return f"{_log_time(line)} {target} -> {match.group('outbound')}".strip()
if _IMPORTANT_LOG_RE.search(line):
return line.strip()
return None
def _compact_target(target: str) -> str:
if target.startswith(("tcp:", "udp:")):
return target.split(":", 1)[1]
return target
def _log_time(line: str) -> str:
match = _LOG_TIME_RE.match(line)
return match.group(1) if match else ""
_LOG_TIME_RE = re.compile(r"^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})")
_DETOUR_RE = re.compile(r"taking detour \[(?P<outbound>[^\]]+)\] for \[(?P<target>[^\]]+)\]")
_IMPORTANT_LOG_RE = re.compile(r"\[(Warning|Error)\]|\b(failed|error|timeout|denied)\b", re.IGNORECASE)
def _inbound_port_errors(inbound: dict) -> list[str]:
port = int(inbound.get("port") or 0)
if port <= 0:
@@ -254,7 +292,7 @@ def _inbound_networks(inbound: dict) -> set[str]:
return {item.strip() for item in network.split(",") if item.strip() in {"tcp", "udp"}}
if protocol == "dokodemo-door":
return {"tcp", "udp"}
if protocol == "socks" and (inbound.get("settings") or {}).get("udp"):
if protocol in {"socks", "mixed"} and (inbound.get("settings") or {}).get("udp"):
return {"tcp", "udp"}
return {"tcp"}
+14
View File
@@ -0,0 +1,14 @@
from __future__ import annotations
import logging
from flask import Flask
logger = logging.getLogger("pyxray")
def log_activity(app: Flask, message: str) -> None:
"""Write an operator-facing activity message to stdout/stderr logging."""
logger.info(message)
+4 -2
View File
@@ -4,7 +4,7 @@ from dataclasses import asdict
from flask import Blueprint, Flask, current_app, render_template
from pyxray.libs.xray_assets import DEFAULT_VERSION, check_xray_assets, official_archive_url
from pyxray.libs.xray_assets import check_xray_assets, default_xray_version, official_archive_url
from pyxray.libs.xray_config.store import dump_settings_toml
from pyxray.web.nodes import get_node_manager
from pyxray.web.xray_assets import asset_form_from_settings, get_asset_settings_store
@@ -31,6 +31,7 @@ def index(): # noqa: ANN202
manager = get_node_manager(current_app)
nodes = [node.to_dict() for node in manager.list_nodes()]
selected_id = manager.selected_id()
selected_node = manager.get_selected_node()
settings_store = get_settings_store(current_app)
settings = settings_store.load()
service_status = get_xray_service(current_app).status()
@@ -41,9 +42,10 @@ def index(): # noqa: ANN202
status=asdict(status),
ready=status.ready,
missing=status.missing,
official_url=official_archive_url(form["version"] or DEFAULT_VERSION),
official_url=official_archive_url(form["version"] or default_xray_version()),
nodes=nodes,
selected_id=selected_id,
selected_name=selected_node.name if selected_node is not None else "",
settings=settings.to_dict(),
settings_toml=dump_settings_toml(settings),
config_path=current_app.config["XRAY_CONFIG_PATH"],
+13 -1
View File
@@ -5,6 +5,7 @@ from pathlib import Path
from flask import Blueprint, Flask, current_app, jsonify, request
from pyxray.libs.nodes import NodeManager, NodeStore
from pyxray.web.activity_log import log_activity
blueprint = Blueprint("nodes", __name__, url_prefix="/api/nodes")
@@ -42,6 +43,9 @@ def import_nodes_api(): # noqa: ANN202
text = request.form.get("links", "").strip()
results = get_node_manager(current_app).import_links(text)
imported = sum(1 for item in results if item.node is not None)
failed = len(results) - imported
log_activity(current_app, f"Nodes imported: imported={imported} failed={failed}")
return jsonify(
{
"results": [
@@ -66,7 +70,14 @@ def select_node_api(): # noqa: ANN202
node = get_node_manager(current_app).select_node(node_id)
except ValueError as exc:
return jsonify({"error": str(exc)}), 404
return jsonify({"node": node.to_dict()})
log_activity(current_app, f"Node selected: {node.name or node.id}")
try:
from pyxray.web.xray_service import restart_xray_service_if_running
restart_status = restart_xray_service_if_running(current_app, reason="node selected")
except Exception as exc: # noqa: BLE001
return jsonify({"error": str(exc), "node": node.to_dict()}), 400
return jsonify({"node": node.to_dict(), "service": restart_status})
@blueprint.delete("/<node_id>")
@@ -74,4 +85,5 @@ def delete_node_api(node_id: str): # noqa: ANN202
"""删除节点。"""
removed = get_node_manager(current_app).remove_node(node_id)
log_activity(current_app, f"Node deleted: {node_id} removed={removed}")
return jsonify({"removed": removed})
+39 -10
View File
@@ -1,12 +1,17 @@
from __future__ import annotations
import atexit
import logging
import os
import signal
import socket
import sys
from pathlib import Path
from flask import Flask
from pyxray import __version__
from pyxray.libs.app_data import resolve_app_data_paths
from pyxray.web.dashboard import register_dashboard
from pyxray.web.jobs import init_job_store
from pyxray.web.nodes import register_nodes
@@ -24,13 +29,13 @@ def create_app(
"""创建 pyxray Web 应用。"""
app = Flask(__name__)
data_dir = Path(default_data_dir) if default_data_dir is not None else _default_data_dir(default_xray_dir)
config_path = data_dir / "config.json"
paths = resolve_app_data_paths(default_xray_dir, data_dir=default_data_dir)
config_path = paths.generated_config
init_job_store(app, run_sync=run_jobs_sync)
register_xray_assets(app, default_xray_dir, data_dir / "download.toml")
register_nodes(app, data_dir / "nodes.toml")
register_xray_config(app, data_dir / "settings.toml", config_path)
register_xray_service(app, xray_dir=default_xray_dir, config_path=config_path, log_path=data_dir / "xray.log")
register_xray_assets(app, paths.xray_dir, paths.download_settings)
register_nodes(app, paths.nodes)
register_xray_config(app, paths.settings, config_path)
register_xray_service(app, xray_dir=paths.xray_dir, config_path=config_path, log_path=paths.log)
_bind_xray_lifecycle(app)
register_dashboard(app)
return app
@@ -39,14 +44,38 @@ def create_app(
def run_web(host: str, port: int, default_xray_dir: str | Path = "data/xray") -> None:
"""启动开发模式 Web 服务。"""
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s", force=True)
logging.getLogger("pyxray").setLevel(logging.INFO)
if os.environ.get("PYXRAY_ACCESS_LOG") not in {"1", "true", "yes", "on"}:
logging.getLogger("werkzeug").disabled = True
_print_startup_banner(host, port)
create_app(default_xray_dir).run(host=host, port=port)
def _default_data_dir(default_xray_dir: str | Path) -> Path:
"""根据资源目录推导默认数据目录。"""
def _print_listen_urls(host: str, port: int) -> None:
if host in {"0.0.0.0", "::"}:
print(f" * Pyxray URL: http://127.0.0.1:{port}", flush=True)
lan_ip = _primary_lan_ip()
if lan_ip:
print(f" * Pyxray URL: http://{lan_ip}:{port}", flush=True)
return
print(f" * Pyxray URL: http://{host}:{port}", flush=True)
path = Path(default_xray_dir)
return path.parent if path.name == "xray" else path
def _print_startup_banner(host: str, port: int) -> None:
print(f" * Pyxray version: {__version__}", flush=True)
_print_listen_urls(host, port)
def _primary_lan_ip() -> str | None:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.connect(("8.8.8.8", 80))
return sock.getsockname()[0]
except OSError:
return None
finally:
sock.close()
def _bind_xray_lifecycle(app: Flask) -> None:
+53 -8
View File
@@ -12,9 +12,9 @@ const mainProgressBar = document.querySelector("#main-progress-bar");
const mainProgressText = document.querySelector("#main-progress-text");
const statusCards = Array.from(document.querySelectorAll("[data-asset-name]"));
const serviceToggle = document.querySelector("#xray-service-toggle");
const serviceState = document.querySelector("#xray-service-state");
const refreshLogsButton = document.querySelector("#refresh-logs-button");
const clearLogsButton = document.querySelector("#clear-logs-button");
const logFormatSelect = document.querySelector("#log-format-select");
const logContent = document.querySelector("#xray-log-content");
const activeJobKey = "pyxray.activeAssetJobId";
const activeFormKey = "pyxray.activeAssetForm";
@@ -134,6 +134,9 @@ nodeList.addEventListener("click", async (event) => {
}
selectedNodeId = payload.node.id;
renderNodes();
if (payload.service) renderServiceStatus(payload.service);
else refreshServiceStatus();
await refreshLogs();
showBox(nodeMessage, `已选择:${payload.node.name}`, "done");
}
@@ -147,6 +150,7 @@ nodeList.addEventListener("click", async (event) => {
if (selectedNodeId === nodeId) selectedNodeId = "";
showBox(nodeMessage, payload.removed ? "节点已删除" : "节点不存在", payload.removed ? "done" : "warn");
await refreshNodes();
refreshServiceStatus();
}
});
@@ -163,6 +167,8 @@ settingsForm.addEventListener("submit", async (event) => {
}
settingsSnapshot = serializeForm(settingsForm);
updateSettingsActions();
if (payload.service) renderServiceStatus(payload.service);
await refreshLogs();
showBox(configMessage, "设置已保存", "done");
});
@@ -241,8 +247,8 @@ function initServiceControls() {
renderServiceStatus(payload);
await refreshLogs();
} catch (error) {
serviceState.textContent = error.message;
serviceState.className = "rounded-full border border-red-500/30 bg-red-500/10 px-4 py-2 font-mono text-xs text-red-300";
serviceToggle.textContent = truncateServiceLabel(error.message);
serviceToggle.className = "rounded-full border border-red-500/30 bg-red-500/10 px-5 py-2 text-sm font-medium text-red-300";
await refreshServiceStatus();
await refreshLogs();
if (logContent && !logContent.textContent.includes(error.message)) {
@@ -260,6 +266,7 @@ function initLogControls() {
if (!refreshLogsButton) return;
refreshLogsButton.addEventListener("click", refreshLogs);
clearLogsButton.addEventListener("click", clearLogs);
logFormatSelect?.addEventListener("change", loadLogs);
resetLogOffset();
setInterval(refreshLogs, 2000);
}
@@ -272,17 +279,24 @@ async function refreshServiceStatus() {
function renderServiceStatus(status) {
serviceToggle.dataset.running = status.running ? "true" : "false";
serviceToggle.textContent = status.running ? "停止 Xray" : "启动 Xray";
serviceToggle.textContent = `${status.running ? "关闭" : "开启"}: ${selectedNodeNameShort()}`;
serviceToggle.className = status.running
? "rounded-full border border-red-500/30 bg-red-500/10 px-5 py-2 text-sm font-medium text-red-300"
: "rounded-full border border-emerald-500/30 bg-emerald-500/10 px-5 py-2 text-sm font-medium text-emerald-300";
serviceState.textContent = status.running ? `pid: ${status.pid}` : "xray: stopped";
serviceState.className = "rounded-full border border-zinc-800 bg-zinc-900 px-4 py-2 font-mono text-xs text-zinc-400";
}
function selectedNodeNameShort() {
const selected = nodes.find((node) => node.id === selectedNodeId);
return truncateServiceLabel(selected?.name || "未选择");
}
function truncateServiceLabel(value) {
return value.length > 8 ? value.slice(0, 8) : value;
}
async function refreshLogs() {
if (!logContent) return;
const query = logOffset === null ? "?offset=end" : `?offset=${encodeURIComponent(logOffset)}`;
const query = logQuery(logOffset === null ? "end" : logOffset);
const response = await fetch(`/api/xray/service/logs${query}`);
const payload = await response.json();
if (!response.ok) {
@@ -315,15 +329,46 @@ async function clearLogs() {
}
}
async function loadLogs() {
if (!logContent) return;
const response = await fetch(`/api/xray/service/logs${logFormatQuery()}`);
const payload = await response.json();
if (!response.ok) {
logContent.textContent = payload.error || `HTTP ${response.status}`;
return;
}
if (typeof payload.offset === "number") logOffset = payload.offset;
logContent.textContent = payload.content || "暂无日志。";
logContent.scrollTop = logContent.scrollHeight;
}
async function resetLogOffset() {
if (!logContent) return;
const response = await fetch("/api/xray/service/logs?offset=end");
const response = await fetch(`/api/xray/service/logs${logQuery("end")}`);
if (!response.ok) return;
const payload = await response.json();
logOffset = payload.offset || 0;
logContent.textContent = "暂无日志。";
}
function logQuery(offset) {
const params = new URLSearchParams();
params.set("offset", String(offset));
if ((logFormatSelect?.value || "compact") === "compact") {
params.set("format", "compact");
}
return `?${params.toString()}`;
}
function logFormatQuery() {
const params = new URLSearchParams();
if ((logFormatSelect?.value || "compact") === "compact") {
params.set("format", "compact");
}
const query = params.toString();
return query ? `?${query}` : "";
}
function updateSettingsActions() {
configActions.classList.toggle("is-visible", serializeForm(settingsForm) !== settingsSnapshot);
}
+4
View File
@@ -19,4 +19,8 @@
</label>
<input name="core.tcp_fast_open" type="hidden" value="default" />
</div>
<label class="config-field mt-4">
<span>transparent</span>
<textarea name="transparent.output_bypass_rules" rows="3" placeholder="tcp 117.72.47.28:33010&#10;all 192.168.0.0/24">{{ settings.transparent.output_bypass_rules }}</textarea>
</label>
</section>
+4
View File
@@ -43,6 +43,10 @@
</select>
</label>
</div>
<label class="config-field mt-4">
<span>FakeDNS 域名范围,每行一个 domain/geosite/keyword 规则</span>
<textarea name="dns.fakedns_domains" placeholder="geosite:geolocation-!cn">{{ settings.dns.fakedns_domains }}</textarea>
</label>
<label class="config-field mt-4">
<span>DNS 规则,每行 server|domains|outbound</span>
<textarea name="dns.rules">{% for item in settings.dns.rules %}{{ item.server }}|{{ item.domains }}|{{ item.outbound }}
+19 -12
View File
@@ -1,22 +1,28 @@
<section class="config-card">
<h3 class="config-card-title">入站端口</h3>
<label class="config-field">
<span>监听地址</span>
<select name="inbounds.listen">
{% for value in ["127.0.0.1", "0.0.0.0"] %}
<option value="{{ value }}" {% if settings.inbounds.listen == value %}selected{% endif %}>{{ value }}</option>
{% endfor %}
</select>
</label>
<div class="config-grid">
<label class="config-field">
<span>监听地址</span>
<select name="inbounds.listen">
{% for value in ["127.0.0.1", "0.0.0.0"] %}
<option value="{{ value }}" {% if settings.inbounds.listen == value %}selected{% endif %}>{{ value }}</option>
{% endfor %}
</select>
</label>
<label class="config-field">
<span>Mixed 端口</span>
<input name="inbounds.rule_http_port" type="number" min="0" max="65535" value="{{ settings.inbounds.rule_http_port }}" />
</label>
</div>
<div class="config-grid mt-4">
<label class="config-field">
<span>规则 SOCKS 端口</span>
<input name="inbounds.rule_socks_port" type="number" min="0" max="65535" value="{{ settings.inbounds.rule_socks_port or settings.inbounds.socks_port }}" />
<span>认证用户名</span>
<input name="inbounds.auth_user" type="text" autocomplete="username" value="{{ settings.inbounds.auth_user }}" placeholder="留空使用 noauth" />
</label>
<label class="config-field">
<span>规则 HTTP 端口</span>
<input name="inbounds.rule_http_port" type="number" min="0" max="65535" value="{{ settings.inbounds.rule_http_port }}" />
<span>认证密码</span>
<input name="inbounds.auth_password" type="password" autocomplete="current-password" value="{{ settings.inbounds.auth_password }}" placeholder="留空使用 noauth" />
</label>
</div>
@@ -40,6 +46,7 @@
<input name="inbounds.socks_port" type="hidden" value="0" />
<input name="inbounds.http_port" type="hidden" value="0" />
<input name="inbounds.rule_socks_port" type="hidden" value="0" />
<input name="inbounds.vmess_port" type="hidden" value="0" />
<input name="inbounds.api.port" type="hidden" value="0" />
<input name="inbounds.port_sharing" type="hidden" value="off" />
+1 -7
View File
@@ -21,14 +21,8 @@
type="button"
data-running="{{ 'true' if service_status.running else 'false' }}"
>
{{ "停止 Xray" if service_status.running else "启动 Xray" }}
{{ "关闭" if service_status.running else "开启" }}: {{ selected_name[:8] if selected_name else "未选择" }}
</button>
<div id="xray-service-state" class="rounded-full border border-zinc-800 bg-zinc-900 px-4 py-2 font-mono text-xs text-zinc-400">
{{ "pid: " ~ service_status.pid if service_status.running else "xray: stopped" }}
</div>
<div class="rounded-full border border-zinc-800 bg-zinc-900 px-4 py-2 font-mono text-xs text-zinc-400">
config: {{ config_path }}
</div>
</div>
</header>
@@ -35,7 +35,7 @@
<label class="grid gap-2">
<span class="text-sm text-zinc-400">下载代理</span>
<input class="rounded-2xl border border-zinc-700 bg-zinc-950 px-4 py-3 outline-none focus:border-zinc-400" name="proxy_url" placeholder="空则使用系统代理;系统没有代理则直连" value="{{ form.proxy_url }}" />
<input class="rounded-2xl border border-zinc-700 bg-zinc-950 px-4 py-3 outline-none focus:border-zinc-400" name="proxy_url" placeholder="例如 http://user:pass@127.0.0.1:20172空则使用系统代理" value="{{ form.proxy_url }}" />
</label>
<label class="grid gap-2">
+7 -1
View File
@@ -4,7 +4,13 @@
<h2 class="text-xl font-medium">运行日志</h2>
<p class="mt-1 text-sm text-zinc-500">Xray stdout / stderr,日志文件:{{ log_path }}</p>
</div>
<div class="flex flex-wrap gap-3">
<div class="flex flex-wrap items-center gap-3">
<label class="flex items-center gap-2 text-sm text-zinc-400">
<select id="log-format-select" class="rounded-2xl border border-zinc-700 bg-zinc-950 px-4 py-3 text-sm font-medium text-zinc-100 outline-none focus:border-emerald-500">
<option value="compact" selected>解析优化日志</option>
<option value="raw">原始日志</option>
</select>
</label>
<button id="refresh-logs-button" class="rounded-2xl border border-zinc-700 bg-zinc-950 px-5 py-3 text-sm font-medium text-zinc-100 hover:bg-zinc-800" type="button">刷新日志</button>
<button id="clear-logs-button" class="rounded-2xl border border-red-500/30 bg-red-500/10 px-5 py-3 text-sm font-medium text-red-300 hover:bg-red-500/20" type="button">清除日志</button>
</div>
+7 -2
View File
@@ -7,10 +7,12 @@ from flask import Blueprint, Flask, current_app, jsonify, request
from pyxray.libs.xray_assets import (
DEFAULT_VERSION,
check_xray_assets,
default_xray_version,
download_bytes_stream,
ensure_xray_assets,
)
from pyxray.libs.xray_asset_settings import XrayAssetSettings, XrayAssetSettingsStore
from pyxray.web.activity_log import log_activity
from pyxray.web.jobs import Job, get_job_store
@@ -35,6 +37,7 @@ def get_asset_settings_store(app: Flask) -> XrayAssetSettingsStore:
def ensure_api(): # noqa: ANN202
form = _form_values()
get_asset_settings_store(current_app).save(_settings_from_form(form))
log_activity(current_app, f"Asset ensure started: target={form['target'] or 'all'} directory={form['directory']}")
job = get_job_store(current_app).start(lambda item: _run_asset_job(item, form), payload=form)
return jsonify({"job_id": job["id"]})
@@ -50,6 +53,7 @@ def save_asset_settings_api(): # noqa: ANN202
form = _form_values()
settings = _settings_from_form(form)
get_asset_settings_store(current_app).save(settings)
log_activity(current_app, f"Asset settings saved: directory={settings.directory} version={settings.version}")
return jsonify(settings.to_dict())
@@ -66,6 +70,7 @@ def cancel_job_api(job_id: str): # noqa: ANN202
job = get_job_store(current_app).cancel(job_id)
if job is None:
return jsonify({"error": "job not found"}), 404
log_activity(current_app, f"Asset job cancelled: {job_id}")
return jsonify({"job_id": job_id, "cancel_requested": True})
@@ -74,7 +79,7 @@ def default_asset_form(directory: str) -> dict[str, str]:
return {
"directory": directory,
"version": DEFAULT_VERSION,
"version": default_xray_version(),
"archive_url": "",
"geoip_url": "",
"geosite_url": "",
@@ -101,7 +106,7 @@ def _form_values() -> dict[str, str]:
def _settings_from_form(form: dict[str, str]) -> XrayAssetSettings:
return XrayAssetSettings(
directory=form["directory"],
version=form["version"] or DEFAULT_VERSION,
version=form["version"] or default_xray_version(),
archive_url=form["archive_url"],
geoip_url=form["geoip_url"],
geosite_url=form["geosite_url"],
+17 -1
View File
@@ -19,6 +19,7 @@ from pyxray.libs.xray_config.settings import (
OutboundSetting,
)
from pyxray.libs.xray_config.store import dump_settings_toml
from pyxray.web.activity_log import log_activity
from pyxray.web.nodes import get_node_manager
@@ -66,7 +67,14 @@ def save_settings_api(): # noqa: ANN202
get_settings_store(current_app).save(settings)
except Exception as exc: # noqa: BLE001
return jsonify({"error": str(exc)}), 400
return jsonify({"settings_toml": dump_settings_toml(settings)})
log_activity(current_app, "Xray settings saved")
try:
from pyxray.web.xray_service import restart_xray_service_if_running
restart_status = restart_xray_service_if_running(current_app, reason="settings saved")
except Exception as exc: # noqa: BLE001
return jsonify({"error": str(exc), "settings_toml": dump_settings_toml(settings)}), 400
return jsonify({"settings_toml": dump_settings_toml(settings), "service": restart_status})
@blueprint.post("/generate")
@@ -77,6 +85,7 @@ def generate_config_api(): # noqa: ANN202
generated = generate_current_xray_config(current_app)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
log_activity(current_app, f"Xray config generated: {generated['config_path']}")
return jsonify(generated)
@@ -140,6 +149,8 @@ def _settings_from_request() -> XrayConfigSettings:
settings.inbounds.http_port = _int("inbounds.http_port", settings.inbounds.http_port)
settings.inbounds.rule_socks_port = _int("inbounds.rule_socks_port", settings.inbounds.rule_socks_port)
settings.inbounds.rule_http_port = _int("inbounds.rule_http_port", settings.inbounds.rule_http_port)
settings.inbounds.auth_user = form.get("inbounds.auth_user", settings.inbounds.auth_user).strip()
settings.inbounds.auth_password = form.get("inbounds.auth_password", settings.inbounds.auth_password).strip()
settings.inbounds.vmess_port = _int("inbounds.vmess_port", settings.inbounds.vmess_port)
settings.inbounds.inbound_sniffing = form.get("inbounds.inbound_sniffing", settings.inbounds.inbound_sniffing)
settings.inbounds.route_only = _enabled("inbounds.route_only")
@@ -172,6 +183,10 @@ def _settings_from_request() -> XrayConfigSettings:
"transparent.tproxy_excluded_interfaces",
settings.transparent.tproxy_excluded_interfaces,
)
settings.transparent.output_bypass_rules = form.get(
"transparent.output_bypass_rules",
settings.transparent.output_bypass_rules,
)
settings.transparent.tproxy_white_country_codes = _lines("transparent.tproxy_white_country_codes")
settings.transparent.tproxy_white_custom_ips = _lines("transparent.tproxy_white_custom_ips")
settings.transparent.tun_bypass_interfaces = form.get("transparent.tun_bypass_interfaces", settings.transparent.tun_bypass_interfaces)
@@ -188,6 +203,7 @@ def _settings_from_request() -> XrayConfigSettings:
settings.dns.local_dns_listen = _bool("dns.local_dns_listen")
settings.dns.antipollution = form.get("dns.antipollution", settings.dns.antipollution)
settings.dns.special_mode = form.get("dns.special_mode", settings.dns.special_mode)
settings.dns.fakedns_domains = form.get("dns.fakedns_domains", settings.dns.fakedns_domains)
settings.dns.rules = [
DnsRuleSettings(server=parts[0], domains=parts[1], outbound=parts[2])
for parts in _split_table("dns.rules", 3)
+37 -3
View File
@@ -6,8 +6,9 @@ from typing import Any
from flask import Blueprint, Flask, current_app, jsonify, request
from pyxray.libs.xray_runtime import XrayServiceManager, log_file_size, read_log_since, read_log_tail
from pyxray.libs.xray_runtime import XrayServiceManager, compact_xray_log, log_file_size, read_log_since, read_log_tail
from pyxray.libs.xray_transparent_runtime import TransparentRuntime
from pyxray.web.activity_log import log_activity
from pyxray.web.xray_assets import get_asset_settings_store
from pyxray.web.xray_config import generate_current_xray_config, get_settings_store
@@ -51,17 +52,22 @@ def status_api(): # noqa: ANN202
@blueprint.post("/start")
def start_api(): # noqa: ANN202
try:
log_activity(current_app, "Xray start requested")
status = start_xray_service(current_app)
save_service_state(current_app, desired_running=True)
log_activity(current_app, f"Xray started: pid={status['pid']}")
return jsonify(status)
except Exception as exc: # noqa: BLE001
log_activity(current_app, f"Xray start failed: {exc}")
return jsonify({"error": str(exc), "status": get_xray_service(current_app).status()}), 400
@blueprint.post("/stop")
def stop_api(): # noqa: ANN202
log_activity(current_app, "Xray stop requested")
status = get_xray_service(current_app).stop()
save_service_state(current_app, desired_running=False)
log_activity(current_app, "Xray stopped")
return jsonify(status)
@@ -69,20 +75,27 @@ def stop_api(): # noqa: ANN202
def logs_api(): # noqa: ANN202
path = current_app.config["XRAY_LOG_PATH"]
offset = request.args.get("offset")
compact = request.args.get("format") == "compact"
if offset == "end":
size = log_file_size(path)
return jsonify({"path": path, "content": "", "offset": size})
if offset is not None:
content, size = read_log_since(path, int(offset or 0))
if compact:
content = compact_xray_log(content)
return jsonify({"path": path, "content": content, "offset": size})
return jsonify({"path": path, "content": read_log_tail(path), "offset": log_file_size(path)})
content = read_log_tail(path)
if compact:
content = compact_xray_log(content)
return jsonify({"path": path, "content": content, "offset": log_file_size(path)})
@blueprint.delete("/logs")
def clear_logs_api(): # noqa: ANN202
path = Path(current_app.config["XRAY_LOG_PATH"])
path.parent.mkdir(parents=True, exist_ok=True)
return jsonify({"path": str(path), "content": "", "offset": log_file_size(path)})
path.write_text("", encoding="utf-8")
return jsonify({"path": str(path), "content": "", "offset": 0})
def start_xray_service(app: Flask) -> dict[str, Any]:
@@ -102,16 +115,37 @@ def start_xray_service(app: Flask) -> dict[str, Any]:
return status
def restart_xray_service_if_running(app: Flask, *, reason: str) -> dict[str, Any] | None:
"""Restart Xray after a config-affecting change when it is currently running."""
service = get_xray_service(app)
if not service.status()["running"]:
return None
log_activity(app, f"Xray restart requested: {reason}")
service.stop()
try:
status = start_xray_service(app)
except Exception as exc:
log_activity(app, f"Xray restart failed: {reason}: {exc}")
raise
save_service_state(app, desired_running=True)
log_activity(app, f"Xray restarted: pid={status['pid']}")
return status
def restore_xray_service(app: Flask) -> None:
"""应用启动时按上次用户期望恢复 Xray 运行状态。"""
if not load_service_state(app).get("desired_running", False):
return
try:
log_activity(app, "Xray restore requested")
start_xray_service(app)
_append_service_message(app, "restored desired running state")
log_activity(app, "Xray restored")
except Exception as exc: # noqa: BLE001
_append_service_message(app, f"failed to restore desired running state: {exc}")
log_activity(app, f"Xray restore failed: {exc}")
def load_service_state(app: Flask) -> dict[str, Any]:
+32
View File
@@ -179,6 +179,38 @@ def test_docker_transparent_limits_tproxy_prerouting_to_configured_source_cidrs(
assert "ip saddr 172.16.0.0/12 meta l4proto { tcp, udp }" in nft_rules.nftables
def test_redirect_output_bypass_rules_return_before_transparent_rule() -> None:
settings = XrayConfigSettings()
settings.transparent.mode = "proxy"
settings.transparent.type = "redirect"
settings.transparent.output_bypass_rules = "tcp 117.72.47.28:33010\nall 192.168.0.0/24\nudp 198.51.100.10:3478"
iptables_rules = generate_transparent_rules(settings, backend="iptables")
nft_rules = generate_transparent_rules(settings, backend="nft")
assert "iptables -w 2 -t nat -A TP_OUT -p tcp -d 117.72.47.28 --dport 33010 -j RETURN" in iptables_rules.setup
assert "iptables -w 2 -t nat -A TP_OUT -p tcp -d 192.168.0.0/24 -j RETURN" in iptables_rules.setup
assert "iptables -w 2 -t nat -A TP_OUT -p udp" not in iptables_rules.setup
assert iptables_rules.setup.index("-d 117.72.47.28 --dport 33010") < iptables_rules.setup.index("iptables -w 2 -t nat -A TP_OUT -j TP_RULE")
assert "ip daddr 117.72.47.28 meta l4proto tcp th dport 33010 return" in nft_rules.nftables
assert "meta l4proto udp" not in nft_rules.nftables
def test_tproxy_output_bypass_rules_support_udp_and_all() -> None:
settings = XrayConfigSettings()
settings.transparent.mode = "proxy"
settings.transparent.type = "tproxy"
settings.transparent.output_bypass_rules = "udp 198.51.100.10:3478\nall 192.168.0.0/24"
iptables_rules = generate_transparent_rules(settings, backend="iptables")
nft_rules = generate_transparent_rules(settings, backend="nft")
assert "iptables -w 2 -t mangle -A TP_OUT -p udp -d 198.51.100.10 --dport 3478 -j RETURN" in iptables_rules.setup
assert "iptables -w 2 -t mangle -A TP_OUT -p tcp -d 192.168.0.0/24 -j RETURN" in iptables_rules.setup
assert "iptables -w 2 -t mangle -A TP_OUT -p udp -d 192.168.0.0/24 -j RETURN" in iptables_rules.setup
assert "ip daddr 198.51.100.10 meta l4proto udp th dport 3478 return" in nft_rules.nftables
def test_close_mode_has_no_system_rules() -> None:
settings = XrayConfigSettings()
settings.transparent.mode = "close"
+55 -10
View File
@@ -6,7 +6,17 @@ from urllib.response import addinfourl
import pytest
from pyxray.libs.xray_assets import DEFAULT_VERSION, download_bytes, download_bytes_stream, ensure_xray_assets, official_archive_url
from pyxray.libs.xray_assets import (
DEFAULT_VERSION,
default_archive_name,
default_xray_version,
download_bytes,
download_bytes_stream,
ensure_xray_assets,
latest_xray_version,
official_archive_url,
required_files,
)
def _zip_bytes(files: dict[str, bytes]) -> bytes:
@@ -19,13 +29,46 @@ def _zip_bytes(files: dict[str, bytes]) -> bytes:
def test_official_archive_url_defaults_to_xray_core_v26_5_9() -> None:
assert DEFAULT_VERSION == "v26.5.9"
assert official_archive_url() == "https://github.com/XTLS/Xray-core/releases/download/v26.5.9/Xray-linux-64.zip"
assert official_archive_url(archive_name="Xray-linux-64.zip") == (
"https://github.com/XTLS/Xray-core/releases/download/v26.5.9/Xray-linux-64.zip"
)
def test_default_archive_name_is_platform_specific() -> None:
assert default_archive_name(os_name="posix", machine="x86_64") == "Xray-linux-64.zip"
assert default_archive_name(os_name="nt", machine="AMD64") == "Xray-windows-64.zip"
assert default_archive_name(os_name="nt", machine="ARM64") == "Xray-windows-arm64-v8a.zip"
def test_required_files_are_platform_specific() -> None:
assert required_files(os_name="posix") == ("xray", "geoip.dat", "geosite.dat")
assert required_files(os_name="nt") == ("xray.exe", "geoip.dat", "geosite.dat")
def test_latest_xray_version_reads_github_release_tag() -> None:
def fetcher(url: str, timeout: float) -> str:
assert url == "https://api.github.com/repos/XTLS/Xray-core/releases/latest"
assert timeout == 5.0
return '{"tag_name": "v99.1.2"}'
assert latest_xray_version(fetcher=fetcher) == "v99.1.2"
def test_default_xray_version_falls_back_to_pinned_version(monkeypatch) -> None: # noqa: ANN001
monkeypatch.setattr("pyxray.libs.xray_assets._DEFAULT_VERSION_CACHE", None)
monkeypatch.setattr(
"pyxray.libs.xray_assets.latest_xray_version",
lambda *, timeout: (_ for _ in ()).throw(TimeoutError("slow")),
)
assert default_xray_version(timeout=5.0) == DEFAULT_VERSION
def test_ensure_xray_assets_extracts_official_archive_files(tmp_path) -> None: # noqa: ANN001
xray_name = required_files()[0]
archive = _zip_bytes(
{
"xray": b"bin",
xray_name: b"bin",
"geoip.dat": b"geoip",
"geosite.dat": b"geosite",
"README.md": b"ignored",
@@ -42,13 +85,13 @@ def test_ensure_xray_assets_extracts_official_archive_files(tmp_path) -> None:
assert result.ready is True
assert result.downloaded == ("archive",)
assert calls == [official_archive_url()]
assert (tmp_path / "xray").read_bytes() == b"bin"
assert (tmp_path / xray_name).read_bytes() == b"bin"
assert (tmp_path / "geoip.dat").read_bytes() == b"geoip"
assert (tmp_path / "geosite.dat").read_bytes() == b"geosite"
def test_version_and_archive_url_can_be_overridden(tmp_path) -> None: # noqa: ANN001
archive = _zip_bytes({"xray": b"bin", "geoip.dat": b"geoip", "geosite.dat": b"geosite"})
archive = _zip_bytes({required_files()[0]: b"bin", "geoip.dat": b"geoip", "geosite.dat": b"geosite"})
calls = []
def downloader(url: str) -> bytes:
@@ -61,7 +104,8 @@ def test_version_and_archive_url_can_be_overridden(tmp_path) -> None: # noqa: A
def test_dat_urls_override_archive_dat_files(tmp_path) -> None: # noqa: ANN001
archive = _zip_bytes({"xray": b"bin", "geoip.dat": b"old-geoip", "geosite.dat": b"old-geosite"})
xray_name = required_files()[0]
archive = _zip_bytes({xray_name: b"bin", "geoip.dat": b"old-geoip", "geosite.dat": b"old-geosite"})
payloads = {
official_archive_url(): archive,
"https://mirror.invalid/geoip.dat": b"new-geoip",
@@ -76,13 +120,14 @@ def test_dat_urls_override_archive_dat_files(tmp_path) -> None: # noqa: ANN001
)
assert result.downloaded == ("archive", "geoip.dat", "geosite.dat")
assert (tmp_path / "xray").read_bytes() == b"bin"
assert (tmp_path / xray_name).read_bytes() == b"bin"
assert (tmp_path / "geoip.dat").read_bytes() == b"new-geoip"
assert (tmp_path / "geosite.dat").read_bytes() == b"new-geosite"
def test_existing_files_skip_download(tmp_path) -> None: # noqa: ANN001
(tmp_path / "xray").write_bytes(b"bin")
xray_name = required_files()[0]
(tmp_path / xray_name).write_bytes(b"bin")
(tmp_path / "geoip.dat").write_bytes(b"geoip")
(tmp_path / "geosite.dat").write_bytes(b"geosite")
@@ -90,7 +135,7 @@ def test_existing_files_skip_download(tmp_path) -> None: # noqa: ANN001
assert result.ready is True
assert result.downloaded == ()
assert result.skipped == ("xray", "geoip.dat", "geosite.dat")
assert result.skipped == (xray_name, "geoip.dat", "geosite.dat")
def test_force_redownloads_selected_existing_file(tmp_path) -> None: # noqa: ANN001
@@ -116,7 +161,7 @@ def test_force_redownloads_selected_existing_file(tmp_path) -> None: # noqa: AN
def test_missing_required_file_raises_after_bad_archive(tmp_path) -> None: # noqa: ANN001
archive = _zip_bytes({"xray": b"bin", "geoip.dat": b"geoip"})
archive = _zip_bytes({required_files()[0]: b"bin", "geoip.dat": b"geoip"})
with pytest.raises(FileNotFoundError, match="geosite.dat"):
ensure_xray_assets(tmp_path, downloader=lambda url: archive)
+69 -15
View File
@@ -80,12 +80,17 @@ def test_settings_defaults_match_v2raya_core_values() -> None:
assert settings.inbounds.http_port == 20171
assert settings.inbounds.rule_socks_port == 0
assert settings.inbounds.rule_http_port == 20172
assert settings.inbounds.auth_user == ""
assert settings.inbounds.auth_password == ""
assert settings.transparent.mode == "close"
assert settings.transparent.type == "redirect"
assert settings.transparent.port == 52345
assert settings.transparent.docker_transparent is True
assert settings.transparent.docker_transparent_cidrs == "172.16.0.0/12"
assert settings.transparent.output_bypass_rules == ""
assert settings.dns.query_strategy == "UseIPv4"
assert settings.dns.special_mode == "none"
assert settings.dns.fakedns_domains == "geosite:geolocation-!cn"
assert settings.dns.rules == [
DnsRuleSettings(server="localhost", domains="geosite:private", outbound="direct"),
DnsRuleSettings(server="223.5.5.5", domains="geosite:cn", outbound="direct"),
@@ -102,7 +107,10 @@ def test_generate_default_config_matches_v2raya_template_shape() -> None:
assert _inbound(config, "socks")["protocol"] == "socks"
assert _inbound(config, "socks")["settings"]["udp"] is True
assert _inbound(config, "http")["port"] == 20171
assert _inbound(config, "rule-http")["port"] == 20172
assert _inbound(config, "rule-mixed")["port"] == 20172
assert _inbound(config, "rule-mixed")["protocol"] == "mixed"
assert _inbound(config, "rule-mixed")["settings"] == {"auth": "noauth", "udp": True, "allowTransparent": False}
assert "rule-http" not in {item["tag"] for item in config["inbounds"]}
assert "rule-socks" not in {item["tag"] for item in config["inbounds"]}
assert _outbound(config, "proxy")["protocol"] == "shadowsocks"
assert _outbound(config, "direct")["protocol"] == "freedom"
@@ -116,6 +124,22 @@ def test_generate_default_config_matches_v2raya_template_shape() -> None:
assert "dns-in" not in {item["tag"] for item in config["inbounds"]}
def test_generate_mixed_inbound_with_password_auth() -> None:
settings = XrayConfigSettings()
settings.inbounds.auth_user = "alice"
settings.inbounds.auth_password = "secret"
mixed = _inbound(generate_xray_config(parse_node_link(_ss_link()), settings), "rule-mixed")
assert mixed["protocol"] == "mixed"
assert mixed["settings"] == {
"auth": "password",
"udp": True,
"allowTransparent": False,
"accounts": [{"user": "alice", "pass": "secret"}],
}
def test_generate_none_log_level_disables_xray_logs() -> None:
settings = XrayConfigSettings()
settings.core.log_level = "none"
@@ -290,7 +314,6 @@ def test_generate_vless_ws_early_data_grpc_and_xhttp_settings() -> None:
def test_generate_inbounds_custom_api_and_transparent_tproxy() -> None:
settings = XrayConfigSettings()
settings.inbounds.port_sharing = True
settings.inbounds.rule_socks_port = 20173
settings.inbounds.vmess_port = 20174
settings.inbounds.api.port = 20175
settings.inbounds.custom.append(CustomInboundSettings(tag="extra-http", protocol="http", port=20176))
@@ -300,7 +323,8 @@ def test_generate_inbounds_custom_api_and_transparent_tproxy() -> None:
config = generate_xray_config(parse_node_link(_ss_link()), settings)
assert _inbound(config, "socks")["listen"] == "0.0.0.0"
assert _inbound(config, "rule-socks")["port"] == 20173
assert _inbound(config, "rule-mixed")["listen"] == "0.0.0.0"
assert _inbound(config, "rule-mixed")["port"] == 20172
assert _inbound(config, "vmess")["protocol"] == "vmess"
assert _inbound(config, "extra-http")["protocol"] == "http"
assert _inbound(config, "transparent")["streamSettings"]["sockopt"]["tproxy"] == "tproxy"
@@ -311,15 +335,14 @@ def test_generate_inbounds_custom_api_and_transparent_tproxy() -> None:
def test_generate_routing_modes_follow_v2raya_rules() -> None:
settings = XrayConfigSettings()
settings.inbounds.rule_socks_port = 20173
whitelist = generate_xray_config(parse_node_link(_ss_link()), settings)["routing"]["rules"]
assert {"type": "field", "inboundTag": ["rule-http", "rule-socks"], "domain": ["geosite:cn"], "outboundTag": "direct"} in whitelist
assert {"type": "field", "inboundTag": ["rule-http", "rule-socks"], "domain": ["geosite:google"], "outboundTag": "proxy"} in whitelist
assert {"type": "field", "inboundTag": ["rule-mixed"], "domain": ["geosite:cn"], "outboundTag": "direct"} in whitelist
assert {"type": "field", "inboundTag": ["rule-mixed"], "domain": ["geosite:google"], "outboundTag": "proxy"} in whitelist
settings.routing.mode = "gfwlist"
gfwlist = generate_xray_config(parse_node_link(_ss_link()), settings)["routing"]["rules"]
assert {"type": "field", "inboundTag": ["rule-http", "rule-socks"], "outboundTag": "direct"} in gfwlist
assert {"type": "field", "inboundTag": ["rule-mixed"], "outboundTag": "direct"} in gfwlist
assert any("91.108.4.0/22" in rule.get("ip", []) for rule in gfwlist)
@@ -330,8 +353,8 @@ def test_generate_custom_routing_a_applies_before_proxy_mode_fallback() -> None:
rules = generate_xray_config(parse_node_link(_ss_link()), settings)["routing"]["rules"]
google_rule = {"type": "field", "inboundTag": ["rule-http"], "outboundTag": "direct", "domain": ["keyword:google"]}
fallback = {"type": "field", "inboundTag": ["rule-http"], "outboundTag": "proxy"}
google_rule = {"type": "field", "inboundTag": ["rule-mixed"], "outboundTag": "direct", "domain": ["keyword:google"]}
fallback = {"type": "field", "inboundTag": ["rule-mixed"], "outboundTag": "proxy"}
assert google_rule in rules
assert fallback in rules
assert rules.index(google_rule) < rules.index(fallback)
@@ -339,7 +362,6 @@ def test_generate_custom_routing_a_applies_before_proxy_mode_fallback() -> None:
def test_generate_custom_text_rules_before_route_mode_and_default_proxy() -> None:
settings = XrayConfigSettings()
settings.inbounds.rule_socks_port = 20173
settings.routing.mode = "whitelist"
settings.routing.default_rule = "proxy"
settings.routing.routing_a = "domain(geosite:google, domain:example.com)->proxy\nip(geoip:cn)->direct"
@@ -347,20 +369,20 @@ def test_generate_custom_text_rules_before_route_mode_and_default_proxy() -> Non
rules = generate_xray_config(parse_node_link(_ss_link()), settings)["routing"]["rules"]
custom_domain = {
"type": "field",
"inboundTag": ["rule-http", "rule-socks"],
"inboundTag": ["rule-mixed"],
"domain": ["geosite:google", "domain:example.com"],
"outboundTag": "proxy",
}
custom_ip = {
"type": "field",
"inboundTag": ["rule-http", "rule-socks"],
"inboundTag": ["rule-mixed"],
"ip": ["geoip:cn"],
"outboundTag": "direct",
}
assert rules.index(custom_domain) < rules.index({"type": "field", "inboundTag": ["rule-http", "rule-socks"], "domain": ["geosite:cn"], "outboundTag": "direct"})
assert rules.index(custom_ip) < rules.index({"type": "field", "inboundTag": ["rule-http", "rule-socks"], "ip": ["geoip:private", "geoip:cn"], "outboundTag": "direct"})
assert {"type": "field", "inboundTag": ["rule-http", "rule-socks"], "outboundTag": "proxy"} in rules
assert rules.index(custom_domain) < rules.index({"type": "field", "inboundTag": ["rule-mixed"], "domain": ["geosite:cn"], "outboundTag": "direct"})
assert rules.index(custom_ip) < rules.index({"type": "field", "inboundTag": ["rule-mixed"], "ip": ["geoip:private", "geoip:cn"], "outboundTag": "direct"})
assert {"type": "field", "inboundTag": ["rule-mixed"], "outboundTag": "proxy"} in rules
def test_generate_dns_rules_and_dns_routing() -> None:
@@ -383,6 +405,30 @@ def test_generate_dns_rules_and_dns_routing() -> None:
assert any(rule.get("domain") == ["dns.google"] and rule.get("outboundTag") == "proxy" for rule in config["routing"]["rules"])
def test_generate_fakedns_adds_dns_server_pool_and_sniffing() -> None:
settings = XrayConfigSettings()
settings.transparent.mode = "gfwlist"
settings.transparent.type = "redirect"
settings.dns.special_mode = "fakedns"
config = generate_xray_config(parse_node_link(_ss_link()), settings)
assert config["fakedns"] == [{"ipPool": "198.18.0.0/15", "poolSize": 65535}]
assert {"address": "fakedns", "domains": ["geosite:geolocation-!cn"]} in config["dns"]["servers"]
assert "fakedns" in _inbound(config, "transparent")["sniffing"]["destOverride"]
assert "fakedns" in _inbound(config, "rule-mixed")["sniffing"]["destOverride"]
def test_generate_fakedns_uses_custom_domain_scope() -> None:
settings = XrayConfigSettings()
settings.dns.special_mode = "fakedns"
settings.dns.fakedns_domains = "geosite:gfw\nkeyword:example"
config = generate_xray_config(parse_node_link(_ss_link()), settings)
assert {"address": "fakedns", "domains": ["geosite:gfw", "keyword:example"]} in config["dns"]["servers"]
def test_validate_settings_rejects_invalid_values() -> None:
settings = XrayConfigSettings()
settings.transparent.type = "bad"
@@ -391,6 +437,14 @@ def test_validate_settings_rejects_invalid_values() -> None:
validate_settings(settings)
def test_validate_settings_requires_complete_inbound_auth() -> None:
settings = XrayConfigSettings()
settings.inbounds.auth_user = "alice"
with pytest.raises(ValueError, match="auth_user"):
validate_settings(settings)
def _ss_link() -> str:
user = base64.urlsafe_b64encode(b"chacha20-ietf-poly1305:secret").decode().rstrip("=")
return f"ss://{user}@ss.example.net:8388#ss-node"
+19 -3
View File
@@ -2,8 +2,12 @@ from __future__ import annotations
import subprocess
import socket
import os
import shutil
import sys
from pathlib import Path
from pyxray.libs.xray_assets import xray_executable_name
from pyxray.libs.xray_config import XrayConfigSettings, write_transparent_rule_files
from pyxray.libs.xray_runtime import XrayServiceManager
from pyxray.libs.xray_transparent_runtime import TransparentRuntime
@@ -58,9 +62,7 @@ def test_transparent_runtime_auto_backend_falls_back_to_nft_when_iptables_setup_
def test_xray_service_manager_reports_inbound_port_conflict(tmp_path: Path) -> None:
xray = tmp_path / "xray"
xray.write_text("#!/bin/sh\nsleep 30\n", encoding="utf-8")
xray.chmod(0o755)
_write_fake_xray(tmp_path)
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.bind(("127.0.0.1", 0))
listener.listen(1)
@@ -101,3 +103,17 @@ def _executor(commands: list[list[str]], failures: set[str] | None = None):
)
return execute
def _write_fake_xray(directory: Path) -> Path:
xray = directory / xray_executable_name()
if os.name == "nt":
try:
os.link(sys.executable, xray)
except OSError:
shutil.copy2(sys.executable, xray)
(directory / "run").write_text("import time\ntime.sleep(30)\n", encoding="utf-8")
return xray
xray.write_text("#!/bin/sh\nsleep 30\n", encoding="utf-8")
xray.chmod(0o755)
return xray
+138
View File
@@ -0,0 +1,138 @@
from __future__ import annotations
from pathlib import Path
from pyxray import cli
from pyxray.libs.xray_assets import XrayAssets
def test_no_subcommand_defaults_to_web(monkeypatch) -> None: # noqa: ANN001
captured = {}
def fake_run_web(host: str, port: int, xray_dir: str) -> None:
captured.update({"host": host, "port": port, "xray_dir": xray_dir})
monkeypatch.setattr(cli, "run_web", fake_run_web)
cli.main([])
assert captured == {"host": "0.0.0.0", "port": 8000, "xray_dir": "data/xray"}
def test_web_subcommand_still_uses_web_runner(monkeypatch) -> None: # noqa: ANN001
captured = {}
def fake_run_web(host: str, port: int, xray_dir: str) -> None:
captured.update({"host": host, "port": port, "xray_dir": xray_dir})
monkeypatch.setattr(cli, "run_web", fake_run_web)
cli.main(["web", "--host", "127.0.0.1", "--port", "9000", "--xray-dir", "runtime/xray"])
assert captured == {"host": "127.0.0.1", "port": 9000, "xray_dir": "runtime/xray"}
def test_configs_download_prints_download_toml(tmp_path: Path, capsys, monkeypatch) -> None: # noqa: ANN001
monkeypatch.chdir(tmp_path)
data = tmp_path / "data"
data.mkdir()
(data / "download.toml").write_text('version = "v1.2.3"\n', encoding="utf-8")
cli.main(["configs", "--download", "--xray-dir", "data/xray"])
assert capsys.readouterr().out == 'version = "v1.2.3"\n'
def test_clear_download_removes_only_download_settings_and_known_assets(tmp_path: Path, monkeypatch) -> None: # noqa: ANN001
monkeypatch.chdir(tmp_path)
data = tmp_path / "data"
xray_dir = data / "xray"
xray_dir.mkdir(parents=True)
(data / "download.toml").write_text("download", encoding="utf-8")
(data / "nodes.toml").write_text("nodes", encoding="utf-8")
(xray_dir / "xray").write_bytes(b"xray")
(xray_dir / "xray.exe").write_bytes(b"xray.exe")
(xray_dir / "geoip.dat").write_bytes(b"geoip")
(xray_dir / "geosite.dat").write_bytes(b"geosite")
(xray_dir / "user-file.txt").write_text("keep", encoding="utf-8")
cli.main(["clear", "--download", "--xray-dir", "data/xray"])
assert not (data / "download.toml").exists()
assert not (xray_dir / "xray").exists()
assert not (xray_dir / "xray.exe").exists()
assert not (xray_dir / "geoip.dat").exists()
assert not (xray_dir / "geosite.dat").exists()
assert (data / "nodes.toml").exists()
assert (xray_dir / "user-file.txt").exists()
def test_clear_all_removes_known_data_and_keeps_unrelated_files(tmp_path: Path, monkeypatch) -> None: # noqa: ANN001
monkeypatch.chdir(tmp_path)
data = tmp_path / "data"
xray_dir = data / "xray"
transparent_dir = data / "transparent"
transparent_dir.mkdir(parents=True)
xray_dir.mkdir()
for name in ("download.toml", "nodes.toml", "settings.toml", "config.json", "service-state.json", "xray.log"):
(data / name).write_text(name, encoding="utf-8")
(transparent_dir / "transparent-iptables-setup.sh").write_text("script", encoding="utf-8")
(xray_dir / "xray").write_bytes(b"xray")
(xray_dir / "geoip.dat").write_bytes(b"geoip")
(data / "notes.txt").write_text("keep", encoding="utf-8")
(xray_dir / "custom.dat").write_text("keep", encoding="utf-8")
cli.main(["clear", "--all", "--xray-dir", "data/xray"])
for name in ("download.toml", "nodes.toml", "settings.toml", "config.json", "service-state.json", "xray.log"):
assert not (data / name).exists()
assert not transparent_dir.exists()
assert not (xray_dir / "xray").exists()
assert not (xray_dir / "geoip.dat").exists()
assert (data / "notes.txt").exists()
assert (xray_dir / "custom.dat").exists()
def test_download_command_persists_settings_and_reuses_ensure(monkeypatch, tmp_path: Path) -> None: # noqa: ANN001
monkeypatch.chdir(tmp_path)
captured = {}
def fake_ensure_xray_assets(directory, **options): # noqa: ANN001
captured["directory"] = directory
captured["options"] = options
path = Path(directory)
path.mkdir(parents=True, exist_ok=True)
xray = path / "xray"
xray.write_bytes(b"xray")
geoip = path / "geoip.dat"
geoip.write_bytes(b"geoip")
geosite = path / "geosite.dat"
geosite.write_bytes(b"geosite")
return XrayAssets(directory=path, xray=xray, geoip=geoip, geosite=geosite, downloaded=("geoip.dat",))
monkeypatch.setattr("pyxray.libs.app_data.ensure_xray_assets", fake_ensure_xray_assets)
cli.main(
[
"download",
"--target",
"geoip",
"--directory",
"data/xray",
"--version",
"v1.2.3",
"--geoip-url",
"https://mirror.example.invalid/geoip.dat",
"--force",
]
)
assert captured["directory"] == "data/xray"
assert captured["options"]["target"] == "geoip"
assert captured["options"]["version"] == "v1.2.3"
assert captured["options"]["geoip_url"] == "https://mirror.example.invalid/geoip.dat"
assert captured["options"]["force"] is True
content = (tmp_path / "data" / "download.toml").read_text(encoding="utf-8")
assert 'directory = "data/xray"' in content
assert 'target = "geoip"' in content
assert 'version = "v1.2.3"' in content
+12
View File
@@ -0,0 +1,12 @@
from __future__ import annotations
from pyxray import __version__
from pyxray.web import server
def test_startup_banner_prints_version_first(capsys) -> None: # noqa: ANN001
server._print_startup_banner("127.0.0.1", 3309)
lines = capsys.readouterr().out.splitlines()
assert lines[0] == f" * Pyxray version: {__version__}"
assert lines[1] == " * Pyxray URL: http://127.0.0.1:3309"
+174 -51
View File
@@ -3,16 +3,18 @@ from __future__ import annotations
import base64
import json
import os
import shutil
import subprocess
import sys
import time
from pathlib import Path
from pyxray.libs.xray_assets import XrayAssets
from pyxray.libs.xray_assets import XrayAssets, xray_executable_name
from pyxray.web.server import create_app
def test_index_shows_asset_status(tmp_path: Path) -> None:
(tmp_path / "xray").write_bytes(b"bin")
(tmp_path / xray_executable_name()).write_bytes(b"bin")
app = create_app(tmp_path)
client = app.test_client()
@@ -42,12 +44,13 @@ def test_ensure_api_uses_form_values(monkeypatch, tmp_path: Path) -> None: # no
captured["directory"] = directory
captured["options"] = options
Path(directory).mkdir(parents=True, exist_ok=True)
(Path(directory) / "xray").write_bytes(b"bin")
xray = Path(directory) / xray_executable_name()
xray.write_bytes(b"bin")
(Path(directory) / "geoip.dat").write_bytes(b"geoip")
(Path(directory) / "geosite.dat").write_bytes(b"geosite")
return XrayAssets(
directory=Path(directory),
xray=Path(directory) / "xray",
xray=xray,
geoip=Path(directory) / "geoip.dat",
geosite=Path(directory) / "geosite.dat",
downloaded=("archive",),
@@ -121,6 +124,17 @@ def test_asset_settings_api_persists_download_form_values(tmp_path: Path) -> Non
assert "v9.9.9" in body
def test_asset_settings_default_version_uses_latest_release(monkeypatch, tmp_path: Path) -> None: # noqa: ANN001
monkeypatch.setattr("pyxray.libs.xray_assets._DEFAULT_VERSION_CACHE", None)
monkeypatch.setattr("pyxray.libs.xray_assets.latest_xray_version", lambda *, timeout: "v99.9.9")
app = create_app(tmp_path)
client = app.test_client()
payload = client.get("/api/xray/assets/settings").get_json()
assert payload["version"] == "v99.9.9"
def test_job_records_real_download_percent(monkeypatch, tmp_path: Path) -> None: # noqa: ANN001
def fake_download_bytes_stream(url, progress, **options): # noqa: ANN001
progress(url, 0, 10)
@@ -131,12 +145,13 @@ def test_job_records_real_download_percent(monkeypatch, tmp_path: Path) -> None:
def fake_ensure_xray_assets(directory, *, downloader, **options): # noqa: ANN001, ARG001
downloader("https://mirror.invalid/xray.zip")
Path(directory).mkdir(parents=True, exist_ok=True)
(Path(directory) / "xray").write_bytes(b"bin")
xray = Path(directory) / xray_executable_name()
xray.write_bytes(b"bin")
(Path(directory) / "geoip.dat").write_bytes(b"geoip")
(Path(directory) / "geosite.dat").write_bytes(b"geosite")
return XrayAssets(
directory=Path(directory),
xray=Path(directory) / "xray",
xray=xray,
geoip=Path(directory) / "geoip.dat",
geosite=Path(directory) / "geosite.dat",
downloaded=("archive",),
@@ -173,9 +188,7 @@ def test_cancel_job_api_marks_job_cancel_requested(tmp_path: Path) -> None:
def test_xray_service_api_starts_stops_and_reads_logs(tmp_path: Path) -> None:
xray = tmp_path / "xray"
xray.write_text("#!/bin/sh\necho xray-started\nsleep 30\n", encoding="utf-8")
os.chmod(xray, 0o755)
_write_fake_xray(tmp_path, stdout="xray-started")
app = create_app(tmp_path)
client = app.test_client()
client.post("/api/nodes/import", data={"links": _ss_link("secret", "ss-node")})
@@ -199,9 +212,7 @@ def test_xray_service_api_starts_stops_and_reads_logs(tmp_path: Path) -> None:
def test_xray_service_restores_desired_running_state_on_app_start(tmp_path: Path) -> None:
xray = tmp_path / "xray"
xray.write_text("#!/bin/sh\necho restored-start\nsleep 30\n", encoding="utf-8")
os.chmod(xray, 0o755)
_write_fake_xray(tmp_path, stdout="restored-start")
app = create_app(tmp_path)
client = app.test_client()
client.post("/api/nodes/import", data={"links": _ss_link("secret", "ss-node")})
@@ -228,9 +239,7 @@ def test_xray_service_restores_desired_running_state_on_app_start(tmp_path: Path
def test_xray_service_log_forwarder_flushes_line_output_quickly(tmp_path: Path) -> None:
xray = tmp_path / "xray"
xray.write_text("#!/bin/sh\necho first-line\nsleep 30\n", encoding="utf-8")
os.chmod(xray, 0o755)
_write_fake_xray(tmp_path, stdout="first-line")
app = create_app(tmp_path)
client = app.test_client()
client.post("/api/nodes/import", data={"links": _ss_link("secret", "ss-node")})
@@ -247,9 +256,7 @@ def test_xray_service_log_forwarder_flushes_line_output_quickly(tmp_path: Path)
def test_xray_service_start_regenerates_config_from_saved_settings(tmp_path: Path) -> None:
xray = tmp_path / "xray"
xray.write_text("#!/bin/sh\nsleep 30\n", encoding="utf-8")
os.chmod(xray, 0o755)
_write_fake_xray(tmp_path)
app = create_app(tmp_path)
client = app.test_client()
client.post("/api/nodes/import", data={"links": _ss_link("secret", "ss-node")})
@@ -268,9 +275,7 @@ def test_xray_service_start_regenerates_config_from_saved_settings(tmp_path: Pat
def test_xray_service_applies_transparent_rules_on_start_and_cleans_on_stop(tmp_path: Path) -> None:
xray = tmp_path / "xray"
xray.write_text("#!/bin/sh\nsleep 30\n", encoding="utf-8")
os.chmod(xray, 0o755)
_write_fake_xray(tmp_path)
app = create_app(tmp_path)
commands: list[str] = []
app.extensions["pyxray_transparent_runtime"].executor = _recording_executor(commands)
@@ -307,9 +312,7 @@ def test_xray_service_applies_transparent_rules_on_start_and_cleans_on_stop(tmp_
def test_xray_service_rolls_back_when_transparent_setup_fails(tmp_path: Path) -> None:
xray = tmp_path / "xray"
xray.write_text("#!/bin/sh\nsleep 30\n", encoding="utf-8")
os.chmod(xray, 0o755)
_write_fake_xray(tmp_path)
app = create_app(tmp_path)
commands: list[str] = []
app.extensions["pyxray_transparent_runtime"].executor = _recording_executor(
@@ -344,9 +347,7 @@ def test_xray_service_rolls_back_when_transparent_setup_fails(tmp_path: Path) ->
def test_xray_service_shutdown_stops_managed_process(tmp_path: Path) -> None:
xray = tmp_path / "xray"
xray.write_text("#!/bin/sh\nsleep 30\n", encoding="utf-8")
os.chmod(xray, 0o755)
_write_fake_xray(tmp_path)
app = create_app(tmp_path)
client = app.test_client()
client.post("/api/nodes/import", data={"links": _ss_link("secret", "ss-node")})
@@ -366,9 +367,7 @@ def test_xray_service_uses_absolute_paths_when_app_created_with_relative_xray_di
monkeypatch.chdir(tmp_path)
xray_dir = tmp_path / "data" / "xray"
xray_dir.mkdir(parents=True)
xray = xray_dir / "xray"
xray.write_text("#!/bin/sh\necho relative-started\nsleep 30\n", encoding="utf-8")
os.chmod(xray, 0o755)
xray = _write_fake_xray(xray_dir, stdout="relative-started")
app = create_app("data/xray")
client = app.test_client()
client.post("/api/nodes/import", data={"links": _ss_link("secret", "ss-node")})
@@ -385,9 +384,7 @@ def test_xray_service_uses_absolute_paths_when_app_created_with_relative_xray_di
def test_xray_service_api_reports_missing_config(tmp_path: Path) -> None:
xray = tmp_path / "xray"
xray.write_text("#!/bin/sh\nsleep 30\n", encoding="utf-8")
os.chmod(xray, 0o755)
_write_fake_xray(tmp_path)
app = create_app(tmp_path)
client = app.test_client()
@@ -398,9 +395,7 @@ def test_xray_service_api_reports_missing_config(tmp_path: Path) -> None:
def test_xray_service_api_records_immediate_start_failure_output(tmp_path: Path) -> None:
xray = tmp_path / "xray"
xray.write_text("#!/bin/sh\necho 'bind: permission denied' >&2\nexit 23\n", encoding="utf-8")
os.chmod(xray, 0o755)
_write_fake_xray(tmp_path, stderr="bind: permission denied", exit_code=23, sleep_seconds=0)
app = create_app(tmp_path)
client = app.test_client()
client.post("/api/nodes/import", data={"links": _ss_link("secret", "ss-node")})
@@ -426,10 +421,10 @@ def test_xray_service_api_clears_logs(tmp_path: Path) -> None:
assert response.status_code == 200
assert response.get_json()["content"] == ""
assert response.get_json()["offset"] == len("old log")
assert logs.get_json()["content"] == "old log"
assert response.get_json()["offset"] == 0
assert logs.get_json()["content"] == ""
assert client.get(f"/api/xray/service/logs?offset={response.get_json()['offset']}").get_json()["content"] == ""
assert (tmp_path / "xray.log").read_text(encoding="utf-8") == "old log"
assert (tmp_path / "xray.log").read_text(encoding="utf-8") == ""
def test_xray_service_logs_api_reads_from_offset(tmp_path: Path) -> None:
@@ -447,6 +442,29 @@ def test_xray_service_logs_api_reads_from_offset(tmp_path: Path) -> None:
assert payload["offset"] == log.stat().st_size
def test_xray_service_logs_api_returns_compact_route_lines(tmp_path: Path) -> None:
log = tmp_path / "xray.log"
log.write_text(
"\n".join(
[
"2026/05/27 04:17:06.829641 [Info] [3636958196] app/dispatcher: sniffed domain: git.pchuan.top",
"2026/05/27 04:17:06.829662 [Info] [3636958196] app/dispatcher: taking detour [direct] for [tcp:git.pchuan.top:80]",
"2026/05/27 04:17:06.829733 from 192.168.0.76:53842 accepted tcp:117.72.47.28:80 [transparent -> direct]",
"2026/05/27 04:17:18.057882 [Info] app/proxyman/outbound: failed to process outbound traffic",
]
),
encoding="utf-8",
)
app = create_app(tmp_path)
client = app.test_client()
payload = client.get("/api/xray/service/logs?format=compact").get_json()
assert "2026/05/27 04:17:06 git.pchuan.top:80 -> direct" in payload["content"]
assert "accepted tcp" not in payload["content"]
assert "failed to process outbound traffic" in payload["content"]
def test_xray_service_logs_api_returns_latest_1000_lines(tmp_path: Path) -> None:
(tmp_path / "xray.log").write_text("\n".join(f"line-{index}" for index in range(1205)), encoding="utf-8")
app = create_app(tmp_path)
@@ -465,10 +483,8 @@ def test_xray_service_prefers_persisted_download_directory(tmp_path: Path) -> No
preferred_dir = tmp_path / "download-xray"
default_dir.mkdir()
preferred_dir.mkdir()
(default_dir / "xray").write_text("#!/bin/sh\necho default-xray\nsleep 30\n", encoding="utf-8")
(preferred_dir / "xray").write_text("#!/bin/sh\necho preferred-xray\nsleep 30\n", encoding="utf-8")
os.chmod(default_dir / "xray", 0o755)
os.chmod(preferred_dir / "xray", 0o755)
_write_fake_xray(default_dir, stdout="default-xray")
preferred_xray = _write_fake_xray(preferred_dir, stdout="preferred-xray")
app = create_app(default_dir, default_data_dir=tmp_path)
client = app.test_client()
client.post("/api/nodes/import", data={"links": _ss_link("secret", "ss-node")})
@@ -480,9 +496,9 @@ def test_xray_service_prefers_persisted_download_directory(tmp_path: Path) -> No
client.post("/api/xray/service/stop")
logs = client.get("/api/xray/service/logs").get_json()["content"]
assert started["xray"] == str(preferred_dir / "xray")
assert started["xray"] == str(preferred_xray)
assert started["xray_dir"] == str(preferred_dir)
assert str(preferred_dir / "xray") in logs
assert str(preferred_xray) in logs
def test_xray_service_falls_back_to_default_directory_when_saved_directory_has_no_xray(tmp_path: Path) -> None:
@@ -490,8 +506,7 @@ def test_xray_service_falls_back_to_default_directory_when_saved_directory_has_n
preferred_dir = tmp_path / "download-xray"
default_dir.mkdir()
preferred_dir.mkdir()
(default_dir / "xray").write_text("#!/bin/sh\necho default-xray\nsleep 30\n", encoding="utf-8")
os.chmod(default_dir / "xray", 0o755)
default_xray = _write_fake_xray(default_dir, stdout="default-xray")
app = create_app(default_dir, default_data_dir=tmp_path)
client = app.test_client()
client.post("/api/nodes/import", data={"links": _ss_link("secret", "ss-node")})
@@ -503,10 +518,10 @@ def test_xray_service_falls_back_to_default_directory_when_saved_directory_has_n
client.post("/api/xray/service/stop")
logs = client.get("/api/xray/service/logs").get_json()["content"]
assert started["xray"] == str(default_dir / "xray")
assert started["xray"] == str(default_xray)
assert started["xray_dir"] == str(default_dir)
assert started["fallback_xray_dir"] == str(default_dir)
assert str(default_dir / "xray") in logs
assert str(default_xray) in logs
def test_nodes_api_imports_lists_selects_and_deletes_node(tmp_path: Path) -> None:
@@ -530,6 +545,29 @@ def test_nodes_api_imports_lists_selects_and_deletes_node(tmp_path: Path) -> Non
assert client.get("/api/nodes").get_json()["nodes"] == []
def test_selecting_node_restarts_running_xray(tmp_path: Path) -> None:
_write_fake_xray(tmp_path, stdout="started", echo_args=True)
app = create_app(tmp_path)
client = app.test_client()
client.post("/api/nodes/import", data={"links": "\n".join([_ss_link("one", "one"), _ss_link("two", "two")])})
nodes = client.get("/api/nodes").get_json()["nodes"]
client.post("/api/nodes/select", data={"node_id": nodes[0]["id"]})
client.post(
"/api/xray/config/settings",
data={"settings_toml": '[inbounds]\nsocks_port = 0\nhttp_port = 0\nrule_http_port = 0\n'},
)
first = client.post("/api/xray/service/start").get_json()
selected = client.post("/api/nodes/select", data={"node_id": nodes[1]["id"]})
config = json.loads((tmp_path / "config.json").read_text(encoding="utf-8"))
client.post("/api/xray/service/stop")
assert selected.status_code == 200
assert selected.get_json()["service"]["running"] is True
assert selected.get_json()["service"]["pid"] != first["pid"]
assert config["outbounds"][0]["settings"]["servers"][0]["password"] == "two"
def test_xray_config_api_saves_settings_and_generates_config(tmp_path: Path) -> None:
app = create_app(tmp_path)
client = app.test_client()
@@ -554,6 +592,32 @@ def test_xray_config_api_saves_settings_and_generates_config(tmp_path: Path) ->
assert (tmp_path / "transparent" / "transparent-iptables-setup.sh").exists()
def test_saving_settings_restarts_running_xray(tmp_path: Path) -> None:
_write_fake_xray(tmp_path, stdout="settings-started")
app = create_app(tmp_path)
client = app.test_client()
client.post("/api/nodes/import", data={"links": _ss_link("secret", "ss-node")})
node = client.get("/api/nodes").get_json()["nodes"][0]
client.post("/api/nodes/select", data={"node_id": node["id"]})
client.post(
"/api/xray/config/settings",
data={"settings_toml": '[inbounds]\nsocks_port = 0\nhttp_port = 0\nrule_http_port = 0\n'},
)
first = client.post("/api/xray/service/start").get_json()
saved = client.post(
"/api/xray/config/settings",
data={"settings_toml": '[core]\nlog_level = "debug"\n[inbounds]\nsocks_port = 0\nhttp_port = 0\nrule_http_port = 0\n'},
)
config = json.loads((tmp_path / "config.json").read_text(encoding="utf-8"))
client.post("/api/xray/service/stop")
assert saved.status_code == 200
assert saved.get_json()["service"]["running"] is True
assert saved.get_json()["service"]["pid"] != first["pid"]
assert config["log"]["loglevel"] == "debug"
def test_xray_config_api_saves_settings_from_form_controls(tmp_path: Path) -> None:
app = create_app(tmp_path)
client = app.test_client()
@@ -572,6 +636,8 @@ def test_xray_config_api_saves_settings_from_form_controls(tmp_path: Path) -> No
"inbounds.http_port": "0",
"inbounds.rule_socks_port": "0",
"inbounds.rule_http_port": "20181",
"inbounds.auth_user": "alice",
"inbounds.auth_password": "secret",
"inbounds.vmess_port": "0",
"inbounds.inbound_sniffing": "http,tls",
"inbounds.route_only": "on",
@@ -585,10 +651,12 @@ def test_xray_config_api_saves_settings_from_form_controls(tmp_path: Path) -> No
"transparent.socks_port": "52306",
"transparent.ipforward": "off",
"transparent.tun_auto_route": "on",
"transparent.output_bypass_rules": "tcp 117.72.47.28:33010",
"dns.query_strategy": "UseIPv4",
"dns.local_dns_listen": "on",
"dns.antipollution": "closed",
"dns.special_mode": "none",
"dns.special_mode": "fakedns",
"dns.fakedns_domains": "geosite:gfw\nkeyword:example",
"dns.rules": "localhost|geosite:private|direct\n8.8.8.8||proxy",
"outbounds.0.tag": "proxy",
"outbounds.0.probe_url": "https://www.gstatic.com/generate_204",
@@ -608,12 +676,22 @@ def test_xray_config_api_saves_settings_from_form_controls(tmp_path: Path) -> No
assert "log_level = \"error\"" in saved.get_json()["settings_toml"]
assert "socks_port = 0" in saved.get_json()["settings_toml"]
assert "http_port = 0" in saved.get_json()["settings_toml"]
assert "auth_user = \"alice\"" in saved.get_json()["settings_toml"]
assert "auth_password = \"secret\"" in saved.get_json()["settings_toml"]
assert "route_only = true" in saved.get_json()["settings_toml"]
assert "output_bypass_rules = \"tcp 117.72.47.28:33010\"" in saved.get_json()["settings_toml"]
assert "special_mode = \"fakedns\"" in saved.get_json()["settings_toml"]
assert "fakedns_domains = \"geosite:gfw\\nkeyword:example\"" in saved.get_json()["settings_toml"]
assert generated.status_code == 200
assert config["log"]["loglevel"] == "error"
assert config["inbounds"][0]["port"] == 20180
assert any(inbound["tag"] == "rule-mixed" and inbound["port"] == 20181 for inbound in config["inbounds"])
assert next(inbound for inbound in config["inbounds"] if inbound["tag"] == "rule-mixed")["settings"]["accounts"] == [
{"user": "alice", "pass": "secret"}
]
assert config["outbounds"][0]["mux"] == {"enabled": True, "concurrency": 16}
assert config["dns"]["queryStrategy"] == "UseIPv4"
assert config["fakedns"] == [{"ipPool": "198.18.0.0/15", "poolSize": 65535}]
assert {"address": "fakedns", "domains": ["geosite:gfw", "keyword:example"]} in config["dns"]["servers"]
def test_xray_config_api_mux_zero_disables_mux(tmp_path: Path) -> None:
@@ -748,6 +826,51 @@ def _ss_link(password: str, name: str) -> str:
return f"ss://{user}@ss.example.net:8388#{name}"
def _write_fake_xray(
directory: Path,
*,
stdout: str = "",
stderr: str = "",
exit_code: int = 0,
sleep_seconds: float = 30,
echo_args: bool = False,
) -> Path:
xray = directory / xray_executable_name()
code = _fake_xray_code(stdout=stdout, stderr=stderr, exit_code=exit_code, sleep_seconds=sleep_seconds, echo_args=echo_args)
if os.name == "nt":
try:
os.link(sys.executable, xray)
except OSError:
shutil.copy2(sys.executable, xray)
(directory / "run").write_text(code, encoding="utf-8")
return xray
xray.write_text(f"#!{sys.executable}\n{code}", encoding="utf-8")
xray.chmod(0o755)
return xray
def _fake_xray_code(
*,
stdout: str,
stderr: str,
exit_code: int,
sleep_seconds: float,
echo_args: bool,
) -> str:
lines = ["import sys", "import time"]
if stdout:
if echo_args:
lines.append(f"print({stdout!r} + ' ' + ' '.join(sys.argv[1:]), flush=True)")
else:
lines.append(f"print({stdout!r}, flush=True)")
if stderr:
lines.append(f"print({stderr!r}, file=sys.stderr, flush=True)")
if sleep_seconds:
lines.append(f"time.sleep({sleep_seconds!r})")
lines.append(f"raise SystemExit({exit_code})")
return "\n".join(lines) + "\n"
def _recording_executor(
commands: list[str],
failures: set[str] | None = None,
Generated
+1 -1
View File
@@ -154,7 +154,7 @@ wheels = [
[[package]]
name = "pyxray"
version = "1.0.0"
version = "1.0.5"
source = { editable = "." }
dependencies = [
{ name = "flask" },