diff --git a/apps_script/Code.gs b/apps_script/Code.gs
index 970e98f..67f2a15 100644
--- a/apps_script/Code.gs
+++ b/apps_script/Code.gs
@@ -43,7 +43,7 @@ const SAFE_REPLAY_METHODS = { GET: 1, HEAD: 1, OPTIONS: 1 };
function doPost(e) {
try {
var req = JSON.parse(e.postData.contents);
- if (req.k !== AUTH_KEY) return _json({ e: "unauthorized" });
+ if (req.k !== AUTH_KEY) return _api(false, "unauthorized", "auth key mismatch", null);
// Batch mode: { k, q: [...] }
if (Array.isArray(req.q)) return _doBatch(req.q);
@@ -51,7 +51,7 @@ function doPost(e) {
// Single mode
return _doSingle(req);
} catch (err) {
- return _json({ e: String(err) });
+ return _api(false, "bad_request", String(err), null);
}
}
@@ -72,14 +72,14 @@ function _maybeGzip(bytes) {
function _doSingle(req) {
if (!req.u || typeof req.u !== "string" || !req.u.match(/^https?:\/\//i)) {
- return _json({ e: "bad url" });
+ return _api(false, "bad_url", "url must be http(s)", null);
}
// Loop guard: refuse to relay back to any Apps Script deployment.
// This fires when an exit node URL is misconfigured to point at a GAS
// script — without this check the script would call itself indefinitely
// and burn through the daily UrlFetch quota in seconds.
if (_GAS_URL_RE.test(req.u)) {
- return _json({ e: "loop detected: relay target cannot be a Google Apps Script URL" });
+ return _api(false, "loop_detected", "relay target cannot be a Google Apps Script URL", null);
}
var opts = _buildOpts(req);
var resp = UrlFetchApp.fetch(req.u, opts);
@@ -90,7 +90,7 @@ function _doSingle(req) {
b: Utilities.base64Encode(gz.b),
};
if (gz.gz) result.gz = 1;
- return _json(result);
+ return _api(true, "ok", "relay_success", result);
}
function _doBatch(items) {
@@ -178,7 +178,7 @@ function _doBatch(items) {
}
}
}
- return _json({ q: results });
+ return _api(true, "ok", "batch_relay_success", { q: results });
}
function _buildOpts(req) {
@@ -221,19 +221,21 @@ function _respHeaders(resp) {
}
function doGet(e) {
- return HtmlService.createHtmlOutput(
- "
My App" +
- '' +
- "Welcome
This application is running normally.
" +
- ""
- );
+ return _api(true, "healthy", "deployment is reachable", {
+ now_ms: Date.now(),
+ quota_note: "apps script quotas are managed by Google account limits"
+ });
}
-function _json(obj) {
- // HtmlService responses can stay on script.google.com for /dev, while
- // ContentService commonly bounces through script.googleusercontent.com.
- // The Python client extracts the JSON payload from the body either way.
- return HtmlService.createHtmlOutput(JSON.stringify(obj)).setXFrameOptionsMode(
- HtmlService.XFrameOptionsMode.ALLOWALL
- );
+function _api(ok, code, message, data) {
+ return ContentService
+ .createTextOutput(
+ JSON.stringify({
+ ok: !!ok,
+ code: String(code || (ok ? "ok" : "error")),
+ message: String(message || ""),
+ data: data === undefined ? null : data
+ })
+ )
+ .setMimeType(ContentService.MimeType.JSON);
}
diff --git a/apps_script/vps_exit_node.py b/apps_script/vps_exit_node.py
index 027bbab..7b48fe7 100644
--- a/apps_script/vps_exit_node.py
+++ b/apps_script/vps_exit_node.py
@@ -34,6 +34,7 @@
import logging
import os
import re
+import socket
import socketserver
import sys
import urllib.error
@@ -209,6 +210,23 @@ def _relay_request(
}
+
+
+def _relay_udp_packet(host: str, port: int, payload: bytes) -> dict:
+ """Send one UDP packet and return one response packet (best effort)."""
+ if not host or port <= 0 or port > 65535:
+ return {"e": "bad_udp_target"}
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ sock.settimeout(2.0)
+ sock.sendto(payload, (host, port))
+ data, _ = sock.recvfrom(65535)
+ return {"ok": True, "payload": base64.b64encode(data).decode()}
+ except Exception as exc:
+ return {"e": str(exc) or type(exc).__name__}
+ finally:
+ sock.close()
+
# ---------------------------------------------------------------------------
# HTTP request handler
# ---------------------------------------------------------------------------
@@ -265,6 +283,7 @@ def do_POST(self): # noqa: N802
m = str(body.get("m") or "GET").upper()
h = _sanitize_headers(body.get("h"))
b64 = body.get("b")
+ udp_mode = bool(body.get("udp"))
if not _PSK:
self._send_json(500, {"e": "server_psk_missing"})
@@ -275,6 +294,25 @@ def do_POST(self): # noqa: N802
self._send_json(401, {"e": "unauthorized"})
return
+
+ if udp_mode:
+ host = str(body.get("host") or "")
+ try:
+ port = int(body.get("port") or 0)
+ except Exception:
+ port = 0
+ payload = b""
+ pb64 = body.get("payload")
+ if isinstance(pb64, str) and pb64:
+ try:
+ payload = base64.b64decode(pb64)
+ except Exception:
+ self._send_json(400, {"e": "bad_udp_payload"})
+ return
+ result = _relay_udp_packet(host, port, payload)
+ self._send_json(200, result)
+ return
+
if not _safe_url(u):
self._send_json(400, {"e": "bad_url"})
return
diff --git a/docs/GETTING_STARTED.md b/docs/GETTING_STARTED.md
index 4a26f85..309620e 100644
--- a/docs/GETTING_STARTED.md
+++ b/docs/GETTING_STARTED.md
@@ -1,156 +1,218 @@
-# Getting Started
+# Getting Started (Complete Guide)
-This guide keeps only the setup path most users need. Follow it once, then come back to the root README only when you need another topic.
+This is the **full, practical setup guide** for first-time users.
+If you follow this page in order, you can go from zero to a working proxy.
-## What You Need
+---
-- Python 3.10 or newer
-- A free Google account
-- Git, or the ZIP download from GitHub
-- A browser where you can set an HTTP proxy
+## Quick Outcome
-## 1. Get The Project
+When done correctly:
+- Local HTTP proxy works at `127.0.0.1:8085`
+- Local SOCKS5 proxy works at `127.0.0.1:1080`
+- HTTPS websites open without certificate warnings
+- You can run route checks with `--scan` and stability-first checks with `--adaptive-scan`
-Choose whichever option works on your network.
+---
-**Option A: ZIP**
+## Prerequisites
-[Click to Download](https://github.com/masterking32/MasterHttpRelayVPN/archive/refs/heads/python_testing.zip)
+- Python `3.10+`
+- A Google account (for Apps Script relay)
+- Git (optional, ZIP download also works)
+- A browser where you can set manual proxy
+---
-**Option B: Git**
+## 1) Download The Project
+
+### Option A — ZIP
+
+Download and extract:
+
+-
+
+Then open terminal in extracted folder.
+
+### Option B — Git
```bash
git clone https://github.com/masterking32/MasterHttpRelayVPN.git
cd MasterHttpRelayVPN
```
-## 2. Deploy The Google Relay
+---
-The relay is the small Apps Script program that fetches websites for you.
+## 2) Deploy The Google Apps Script Relay
-1. Open [Google Apps Script](https://script.google.com/) and sign in.
+1. Open and sign in.
2. Click **New project**.
-3. Delete the default editor content.
-4. Open [apps_script/Code.gs](../apps_script/Code.gs), copy everything, and paste it into Apps Script.
-5. Change this line to a long secret:
+3. Delete default code.
+4. Open local file [`apps_script/Code.gs`](../apps_script/Code.gs), copy all code, paste into Apps Script.
+5. Change:
```javascript
const AUTH_KEY = "your-secret-password-here";
```
-6. Click **Deploy** -> **New deployment**.
+ to your own long random secret.
+
+6. Click **Deploy** → **New deployment**.
7. Select **Web app**.
-8. Set **Execute as** to **Me**.
-9. Set **Who has access** to **Anyone**.
-10. Click **Deploy**, authorize the app, and copy the **Deployment ID**.
+8. Set **Execute as** = **Me**.
+9. Set **Who has access** = **Anyone**.
+10. Deploy, authorize, copy **Deployment ID**.
-Keep the `AUTH_KEY` and Deployment ID nearby. You need both locally.
+You now need **both** values locally:
+- `Deployment ID`
+- `AUTH_KEY`
-## 3. Run The One-Click Launcher
+---
-**Windows**
+## 3) Start The App (Recommended)
+
+### Windows
```cmd
start.bat
```
-**Linux / macOS**
+### Linux / macOS
```bash
chmod +x start.sh
./start.sh
```
-The launcher creates `.venv`, installs dependencies, runs `setup.py` if `config.json` is missing, and starts the proxy.
-
-If dependency installation fails through PyPI, the launcher retries through the runflare mirror automatically.
+What launcher does:
+- creates `.venv`
+- installs dependencies
+- runs setup wizard if `config.json` is missing
+- starts proxy
-## 4. Answer The Setup Wizard
+---
-When the wizard opens:
+## 4) Fill Setup Wizard Correctly
-1. Enter the same `auth_key` you placed inside [apps_script/Code.gs](../apps_script/Code.gs).
-2. Paste the Apps Script Deployment ID.
-3. Keep the default HTTP proxy port `8085` unless you already use that port.
-4. Keep LAN sharing off unless other devices must use this proxy.
+When prompted:
+1. `auth_key` = exactly same as `AUTH_KEY` in Apps Script
+2. `script_id` = your Deployment ID
+3. Keep HTTP port `8085` unless busy
+4. Keep LAN sharing disabled unless you need other devices
-The wizard writes `config.json` for you.
+The wizard creates `config.json`.
-## 5. Configure Your Browser
+---
-Use the HTTP proxy for normal browsing:
+## 5) Configure Browser Proxy
| Field | Value |
-|-------|-------|
+|---|---|
| Proxy type | HTTP |
| Address | `127.0.0.1` |
| Port | `8085` |
-Firefox path: **Settings** -> **General** -> **Network Settings** -> **Manual proxy**. Enter `127.0.0.1` and `8085`, then enable the option to also use it for HTTPS.
+For Firefox: Settings → General → Network Settings → Manual proxy.
+Enable proxy for HTTPS too.
-Chrome and Edge use the system proxy on Windows. You can also use extensions such as FoxyProxy or SwitchyOmega for easier switching.
+---
-## 6. Install The CA Certificate
+## 6) Install Local CA (HTTPS Required)
-HTTPS browsing needs the local CA certificate generated by the proxy. The file is created at `ca/ca.crt` after first run.
+The proxy generates `ca/ca.crt`.
+If auto-install fails, install manually.
-**The app tries to install it automatically. If it cannot, install it manually:**
+### Windows
+1. Open `ca/ca.crt`
+2. Install Certificate
+3. Current User
+4. Place in **Trusted Root Certification Authorities**
+5. Restart browser fully
-**Windows**
+### macOS
+1. Open `ca/ca.crt` in Keychain Access
+2. Open certificate → Trust
+3. Set **Always Trust**
+4. Restart browser
-1. Double-click `ca/ca.crt`.
-2. Choose **Install Certificate**.
-3. Choose **Current User**.
-4. Choose **Place all certificates in the following store**.
-5. Select **Trusted Root Certification Authorities**.
-6. Finish, then fully restart your browser.
-
-**macOS**
-
-1. Open `ca/ca.crt` in Keychain Access.
-2. Find the certificate and open it.
-3. Expand **Trust**.
-4. Set **When using this certificate** to **Always Trust**.
-5. Close the dialog, enter your password, and restart your browser.
-
-**Linux Ubuntu / Debian**
+### Ubuntu / Debian
```bash
sudo cp ca/ca.crt /usr/local/share/ca-certificates/masterhttp-relay.crt
sudo update-ca-certificates
```
-Restart your browser after installing.
+Restart browser.
+
+### Firefox (if needed)
+Firefox can use a separate trust store:
+- Settings → Privacy & Security → Certificates → View Certificates → Authorities → Import `ca/ca.crt`
+- Enable trust for website identification
+
+---
+
+## 7) Verify It Works
+
+- Open normal websites through browser proxy.
+- If `unauthorized` appears: `AUTH_KEY` mismatch between Apps Script and `config.json`.
+- If HTTPS certificate errors appear: CA not trusted correctly.
-**Firefox**
+---
-Firefox may use a separate certificate store:
+## 8) Route Quality Commands
-1. Open **Settings** -> **Privacy & Security** -> **Certificates** -> **View Certificates**.
-2. Go to **Authorities**.
-3. Click **Import** and select `ca/ca.crt`.
-4. Enable **Trust this CA to identify websites**.
+### Fast reachability scan
-## Manual Run Commands
+```bash
+python main.py --scan
+```
-Use these only if you are not using the launcher:
+Use suggested `google_ip` in `config.json`.
+
+### Stability-first adaptive scan (recommended for unstable networks)
```bash
+python main.py --adaptive-scan
+```
+
+This ranking is based on route stability metrics (not only minimum ping).
+
+---
+
+## 9) Manual Start (Without launcher)
+
+### Windows
+
+```cmd
python -m venv .venv
.venv\Scripts\python -m pip install -r requirements.txt
.venv\Scripts\python setup.py
.venv\Scripts\python main.py
```
-On Linux / macOS, replace `.venv\Scripts\python` with `.venv/bin/python`.
+### Linux / macOS
+
+```bash
+python3 -m venv .venv
+.venv/bin/python -m pip install -r requirements.txt
+.venv/bin/python setup.py
+.venv/bin/python main.py
+```
+
+---
+
+## 10) Common Problems (Short)
-## Done
+- `unauthorized`: auth key mismatch
+- proxy connects but sites fail: wrong Deployment ID or script deployment not public
+- HTTPS warnings: CA not installed/trusted
+- some services block Google egress: use Exit Node guide
-When everything is working, the terminal shows the HTTP proxy on `127.0.0.1:8085` and SOCKS5 on `127.0.0.1:1080`.
+---
-Next useful pages:
+## Next Docs
- [Troubleshooting](TROUBLESHOOTING.md)
-- [Configuration Reference](CONFIGURATION.md)
-- [Exit Node Guide](exit-node/EXIT_NODE_DEPLOYMENT.md)
+- [Configuration](CONFIGURATION.md)
+- [Exit Node](exit-node/EXIT_NODE_DEPLOYMENT.md)
+- [Architecture](ARCHITECTURE.md)
diff --git a/docs/fa/GETTING_STARTED.md b/docs/fa/GETTING_STARTED.md
index b0102c6..c10d2b3 100644
--- a/docs/fa/GETTING_STARTED.md
+++ b/docs/fa/GETTING_STARTED.md
@@ -1,70 +1,217 @@
-# شروع سریع
+# شروع سریع (راهنمای کامل)
-این راهنما مسیر ساده راهاندازی را نشان میدهد: یک رله Google Apps Script، یک فایل `config.json`، و پراکسی محلی روی سیستم شما.
+این صفحه یک **راهنمای کامل و ساده** برای شروع است.
+اگر مرحلهبهمرحله جلو بروید، در پایان پراکسی شما کاملا کار خواهد کرد.
-## 1. دریافت پروژه
+---
-**با Git:**
+## نتیجه نهایی
+
+اگر همه چیز درست انجام شود:
+- پراکسی HTTP روی `127.0.0.1:8085` فعال میشود
+- پراکسی SOCKS5 روی `127.0.0.1:1080` فعال میشود
+- سایتهای HTTPS بدون خطای گواهی باز میشوند
+- میتوانید با `--scan` و `--adaptive-scan` کیفیت مسیر را بررسی کنید
+
+---
+
+## پیشنیازها
+
+- Python نسخه `3.10+`
+- یک اکانت Google (برای Apps Script)
+- Git (اختیاری؛ دانلود ZIP هم کافی است)
+- مرورگری که تنظیم پراکسی دستی داشته باشد
+
+---
+
+## 1) دریافت پروژه
+
+### روش A — ZIP
+
+دانلود و extract:
+-
+
+بعد داخل پوشه پروژه terminal باز کنید.
+
+### روش B — Git
```bash
git clone https://github.com/masterking32/MasterHttpRelayVPN.git
cd MasterHttpRelayVPN
```
-**با ZIP:**
+---
+
+## 2) ساخت رله Google Apps Script
-- صفحه [GitHub پروژه](https://github.com/masterking32/MasterHttpRelayVPN) را باز کنید.
-- روی **Code** -> **Download ZIP** کلیک کنید.
-- فایل ZIP را extract کنید.
-- داخل پوشه `MasterHttpRelayVPN` یک terminal باز کنید.
+1. وارد شوید.
+2. روی **New project** کلیک کنید.
+3. کد پیشفرض را پاک کنید.
+4. فایل [`apps_script/Code.gs`](../../apps_script/Code.gs) را باز کنید، کل محتوا را کپی و در Apps Script paste کنید.
+5. این مقدار را تغییر دهید:
-## 2. ساخت رله Google
+ ```javascript
+ const AUTH_KEY = "your-secret-password-here";
+ ```
-- به [Google Apps Script](https://script.google.com/) بروید و یک پروژه جدید بسازید.
-- محتوای [apps_script/Code.gs](../../apps_script/Code.gs) را داخل فایل `Code.gs` کپی کنید.
-- مقدار `AUTH_KEY` را به یک رمز طولانی و تصادفی تغییر دهید.
-- از مسیر **Deploy** -> **New deployment** نوع **Web app** را انتخاب کنید.
-- گزینه **Execute as** را روی **Me** و گزینه دسترسی را روی **Anyone** بگذارید.
-- Deploy کنید و `Deployment ID` را نگه دارید.
+ و یک رمز طولانی و تصادفی خودتان بگذارید.
-بعد از هر تغییر در `Code.gs` باید deployment جدید بسازید.
+6. مسیر **Deploy** → **New deployment** را بزنید.
+7. نوع **Web app** را انتخاب کنید.
+8. **Execute as** را روی **Me** بگذارید.
+9. **Who has access** را روی **Anyone** بگذارید.
+10. Deploy کنید، دسترسی را تایید کنید، و **Deployment ID** را کپی کنید.
-## 3. اجرای لانچر
+دو مقدار مهم برای سیستم محلی:
+- `Deployment ID`
+- `AUTH_KEY`
-**Windows:**
+---
+
+## 3) اجرای برنامه (روش پیشنهادی)
+
+### Windows
```cmd
start.bat
```
-**Linux / macOS:**
+### Linux / macOS
```bash
chmod +x start.sh
./start.sh
```
-لانچر محیط مجازی میسازد، وابستگیها را نصب میکند، اگر `config.json` وجود نداشته باشد setup wizard را اجرا میکند، و سپس پراکسی را بالا میآورد.
+لانچر کارهای زیر را انجام میدهد:
+- ساخت `.venv`
+- نصب وابستگیها
+- اجرای setup wizard اگر `config.json` موجود نباشد
+- اجرای پراکسی
+
+---
-## 4. تنظیم مرورگر
+## 4) تکمیل Setup Wizard
-مرورگر را روی پراکسی زیر تنظیم کنید:
+در wizard:
+1. `auth_key` دقیقا برابر `AUTH_KEY` در Apps Script باشد
+2. `script_id` همان Deployment ID شما باشد
+3. پورت HTTP را `8085` بگذارید (مگر اینکه اشغال باشد)
+4. LAN sharing را فقط وقتی لازم دارید روشن کنید
+
+در پایان فایل `config.json` ساخته میشود.
+
+---
+
+## 5) تنظیم پراکسی در مرورگر
| گزینه | مقدار |
-|-------|-------|
+|---|---|
| نوع پراکسی | HTTP |
| آدرس | `127.0.0.1` |
| پورت | `8085` |
-| SOCKS5، اختیاری | `127.0.0.1:1080` |
-برای HTTPS اگر مرورگر خطای گواهی داد، فایل `ca/ca.crt` را به عنوان trusted root نصب کنید و مرورگر را کامل ببندید و دوباره باز کنید.
+در Firefox: Settings → General → Network Settings → Manual proxy
+و برای HTTPS هم فعال کنید.
+
+---
+
+## 6) نصب CA محلی (برای HTTPS ضروری)
+
+فایل گواهی در `ca/ca.crt` ساخته میشود.
+اگر نصب خودکار انجام نشد، دستی نصب کنید.
+
+### Windows
+1. فایل `ca/ca.crt` را باز کنید
+2. Install Certificate
+3. Current User
+4. ذخیره در **Trusted Root Certification Authorities**
+5. مرورگر را کامل ببندید و دوباره باز کنید
+
+### macOS
+1. `ca/ca.crt` را در Keychain Access باز کنید
+2. بخش Trust را باز کنید
+3. روی **Always Trust** بگذارید
+4. مرورگر را ریاستارت کنید
+
+### Ubuntu / Debian
+
+```bash
+sudo cp ca/ca.crt /usr/local/share/ca-certificates/masterhttp-relay.crt
+sudo update-ca-certificates
+```
+
+مرورگر را ریاستارت کنید.
+
+### Firefox (در صورت نیاز)
+ممکن است Firefox store جداگانه داشته باشد:
+- Settings → Privacy & Security → Certificates → View Certificates → Authorities → Import `ca/ca.crt`
+- گزینه اعتماد برای شناسایی وبسایت را فعال کنید
+
+---
+
+## 7) تست عملکرد
+
+- چند سایت عادی باز کنید.
+- اگر `unauthorized` دیدید: `AUTH_KEY` ناهماهنگ است.
+- اگر خطای گواهی HTTPS دیدید: CA درست نصب نشده.
+
+---
+
+## 8) دستورهای بررسی کیفیت مسیر
+
+### اسکن سریع دسترسی IP
+
+```bash
+python main.py --scan
+```
+
+IP پیشنهادی را در `config.json` قرار دهید.
+
+### اسکن تطبیقیِ پایداریمحور (پیشنهادی برای شبکه ناپایدار)
+
+```bash
+python main.py --adaptive-scan
+```
+
+این اسکن فقط روی کمترین پینگ تصمیم نمیگیرد و پایداری را هم در نظر میگیرد.
+
+---
+
+## 9) اجرای دستی (بدون لانچر)
+
+### Windows
+
+```cmd
+python -m venv .venv
+.venv\Scripts\python -m pip install -r requirements.txt
+.venv\Scripts\python setup.py
+.venv\Scripts\python main.py
+```
+
+### Linux / macOS
+
+```bash
+python3 -m venv .venv
+.venv/bin/python -m pip install -r requirements.txt
+.venv/bin/python setup.py
+.venv/bin/python main.py
+```
+
+---
+
+## 10) مشکلات رایج (خلاصه)
-## 5. بررسی سریع
+- `unauthorized`: عدم تطابق auth key
+- اتصال پراکسی برقرار است ولی سایتها باز نمیشوند: Deployment ID اشتباه یا دسترسی Web app درست نیست
+- خطای HTTPS: گواهی CA نصب/Trusted نشده
+- برخی سرویسها خروجی Google را میبندند: از Exit Node استفاده کنید
-- اگر `unauthorized` دیدید، مقدار `AUTH_KEY` در Apps Script باید دقیقا با `auth_key` در `config.json` یکی باشد.
-- اگر صفحهها باز نمیشوند، [رفع مشکل](TROUBLESHOOTING.md) را ببینید.
-- اگر سرعت پایین است، دستور `python main.py --scan` را اجرا کنید و IP پیشنهادی را در `config.json` بگذارید.
+---
-## قدم بعدی
+## صفحههای بعدی
-برای همه گزینههای تنظیمات، [مرجع تنظیمات](CONFIGURATION.md) را بخوانید. برای مسیرهای خاص مثل ChatGPT یا Turnstile، [راهنمای Exit Node](../exit-node/EXIT_NODE_DEPLOYMENT_FA.md) را ببینید.
+- [رفع مشکل](TROUBLESHOOTING.md)
+- [مرجع تنظیمات](CONFIGURATION.md)
+- [راهنمای Exit Node](../exit-node/EXIT_NODE_DEPLOYMENT_FA.md)
+- [معماری](ARCHITECTURE.md)
diff --git a/main.py b/main.py
index 76e7370..268c16f 100644
--- a/main.py
+++ b/main.py
@@ -24,6 +24,7 @@
from core.constants import __version__
from core.lan_utils import log_lan_access
from core.google_ip_scanner import scan_sync
+from core.adaptive_transport import AdaptiveRouteEngine, ProbeTarget
from core.logging_utils import configure as configure_logging, print_banner
from proxy.mitm import CA_CERT_FILE
from proxy.proxy_server import ProxyServer
@@ -99,6 +100,11 @@ def parse_args():
action="store_true",
help="Scan Google IPs to find the fastest reachable one and exit.",
)
+ parser.add_argument(
+ "--adaptive-scan",
+ action="store_true",
+ help="Run adaptive transport scan (jitter/loss/stability-first) and exit.",
+ )
return parser.parse_args()
@@ -224,6 +230,24 @@ def main():
print("Deploy the Apps Script from Code.gs and paste the Deployment ID.")
sys.exit(1)
+
+ if args.adaptive_scan:
+ configure_logging("INFO")
+ from core.constants import CANDIDATE_IPS
+ engine = AdaptiveRouteEngine(config.get("route_db", "route_intel.sqlite3"))
+ targets = [
+ ProbeTarget(ip=ip, port=443, sni=config.get("front_domain", "www.google.com"), transport_profile="vless_reality")
+ for ip in CANDIDATE_IPS
+ ]
+ ranked = asyncio.run(engine.evaluate(targets))
+ if not ranked:
+ print("No viable adaptive routes found")
+ sys.exit(1)
+ print("Adaptive route ranking (stability-first):")
+ for i, route in enumerate(ranked[:5], 1):
+ print(f"{i}. {route.target.ip} score={route.score:.4f} median={route.median_rtt_ms:.1f}ms jitter={route.jitter_ms:.1f} loss={route.packet_loss:.2%} handshake={route.handshake_success_rate:.2%}")
+ sys.exit(0)
+
# ── Google IP Scanner ──────────────────────────────────────────────────
if args.scan:
configure_logging("INFO")
diff --git a/setup.py b/setup.py
index 0bb2f48..e82a75c 100644
--- a/setup.py
+++ b/setup.py
@@ -18,6 +18,8 @@
import sys
from pathlib import Path
+from src.core.config_sync import ConfigSyncManager, ConfigSyncError
+
HERE = Path(__file__).resolve().parent
CONFIG_PATH = HERE / "config.json"
EXAMPLE_PATH = HERE / "config.example.json"
@@ -159,38 +161,80 @@ def write_config(cfg: dict) -> None:
f.write("\n")
-def main() -> int:
- print()
- print(bold("MasterHttpRelayVPN - setup wizard"))
- print(dim("Answer a few questions and we'll write config.json for you."))
+def _sync_runtime(cfg: dict) -> None:
+ sync = ConfigSyncManager(HERE)
+ artifacts = sync.sync_all_configs(cfg)
+ (HERE / "apps_script" / "Code.gs").write_text(artifacts["apps_script/Code.gs"], encoding="utf-8")
+ (HERE / "apps_script" / "cloudflare_worker.js").write_text(artifacts["apps_script/cloudflare_worker.js"], encoding="utf-8")
+ CONFIG_PATH.write_text(artifacts["config.json"], encoding="utf-8")
+
+def _load_existing_or_base() -> dict:
if CONFIG_PATH.exists():
- if not prompt_yes_no("config.json already exists. Overwrite?", default=False):
- print(dim("Nothing changed."))
- return 0
+ with CONFIG_PATH.open(encoding="utf-8") as f:
+ return json.load(f)
+ return load_base_config()
- cfg = load_base_config()
- suggested_key = random_auth_key()
+def _edit_existing(cfg: dict) -> dict:
print()
- print(bold("Shared password (auth_key)"))
- print(dim(" Must match AUTH_KEY inside apps_script/Code.gs."))
- cfg["auth_key"] = prompt("auth_key", default=suggested_key)
-
+ print(bold("Edit existing config"))
+ cfg["auth_key"] = prompt("auth_key", default=str(cfg.get("auth_key", random_auth_key())))
cfg = configure_apps_script(cfg)
cfg = configure_network(cfg)
+ return cfg
- write_config(cfg)
+def main_menu() -> str:
print()
- print(green(f"[OK] wrote {CONFIG_PATH.name}"))
- print()
- print(bold("Next step:"))
- print(f" python main.py")
+ print(bold("Main Menu"))
+ print(" 1) Create new config")
+ print(" 2) Edit existing config")
+ print(" 3) Validate deployments")
+ print(" 4) Sync configs to all files")
+ print(" 5) View quota status")
+ print(" 6) Advanced settings")
+ print(" 7) Exit")
+ return prompt("Select option", default="7")
+
+
+def main() -> int:
print()
- print(yellow("Reminder: the AUTH_KEY inside apps_script/Code.gs must match the auth_key"))
- print(yellow("you just entered - otherwise the relay will return 'unauthorized'."))
- return 0
+ print(bold("MasterHttpRelayVPN - setup wizard"))
+ print(dim("Answer a few questions and we'll write config.json for you."))
+
+ cfg = _load_existing_or_base()
+ while True:
+ choice = main_menu()
+ if choice == "1":
+ cfg = load_base_config()
+ cfg["auth_key"] = prompt("auth_key", default=random_auth_key())
+ cfg = configure_apps_script(cfg)
+ cfg = configure_network(cfg)
+ write_config(cfg)
+ _sync_runtime(cfg)
+ print(green("[OK] created and synchronized config/runtime artifacts"))
+ elif choice == "2":
+ cfg = _edit_existing(cfg)
+ write_config(cfg)
+ _sync_runtime(cfg)
+ print(green("[OK] edited and synchronized config/runtime artifacts"))
+ elif choice == "3":
+ valid = bool(cfg.get("script_id") or cfg.get("script_ids"))
+ print(green("[OK] deployment configuration looks present") if valid else red("[FAIL] missing script_id/script_ids"))
+ elif choice == "4":
+ try:
+ _sync_runtime(cfg)
+ print(green("[OK] sync complete"))
+ except ConfigSyncError as exc:
+ print(red(f"[FAIL] sync failed: {exc}"))
+ elif choice == "5":
+ print(yellow("[WARN] quota dashboard not yet implemented in setup.py"))
+ elif choice == "6":
+ print(dim("Advanced settings are edited through option #2 in this release."))
+ elif choice == "7":
+ print(dim("Done."))
+ return 0
if __name__ == "__main__":
diff --git a/src/core/adaptive_transport/__init__.py b/src/core/adaptive_transport/__init__.py
new file mode 100644
index 0000000..2e7b603
--- /dev/null
+++ b/src/core/adaptive_transport/__init__.py
@@ -0,0 +1,11 @@
+from .engine import AdaptiveRouteEngine, AdaptiveRouteConfig
+from .models import ProbeTarget, RouteScore
+from .storage import RouteIntelligenceStore
+
+__all__ = [
+ "AdaptiveRouteEngine",
+ "AdaptiveRouteConfig",
+ "ProbeTarget",
+ "RouteScore",
+ "RouteIntelligenceStore",
+]
diff --git a/src/core/adaptive_transport/engine.py b/src/core/adaptive_transport/engine.py
new file mode 100644
index 0000000..78258f8
--- /dev/null
+++ b/src/core/adaptive_transport/engine.py
@@ -0,0 +1,140 @@
+from __future__ import annotations
+
+import asyncio
+import logging
+import time
+from dataclasses import dataclass
+
+from .hygiene import validate_public_ip
+from .models import ProbeTarget, RouteScore, RuntimeMetrics, SessionState
+from .probe import AsyncRouteProbe, summarize
+from .storage import RouteIntelligenceStore
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass(slots=True)
+class AdaptiveRouteConfig:
+ min_concurrency: int = 1
+ max_concurrency: int = 16
+ sticky_session_s: float = 180.0
+ switch_guard_s: float = 30.0
+ circuit_breaker_failures: int = 4
+
+
+class AdaptiveRouteEngine:
+ def __init__(self, db_path: str, cfg: AdaptiveRouteConfig | None = None):
+ self.cfg = cfg or AdaptiveRouteConfig()
+ self.store = RouteIntelligenceStore(db_path)
+ self.probe = AsyncRouteProbe()
+ self._active_route: RouteScore | None = None
+ self._active_until = 0.0
+ self._failure_counts: dict[str, int] = {}
+ self._session = SessionState()
+ self._scan_tasks: set[asyncio.Task] = set()
+ self._stopped = False
+
+ async def evaluate(self, targets: list[ProbeTarget], cancel_event: asyncio.Event | None = None) -> list[RouteScore]:
+ if self._stopped:
+ return []
+ ordered_targets = sorted(targets, key=self._route_key)
+ concurrency = max(1, min(self.cfg.max_concurrency, max(self.cfg.min_concurrency, len(ordered_targets))))
+ sem = asyncio.Semaphore(concurrency)
+ results: list[RouteScore] = []
+
+ async def worker(t: ProbeTarget):
+ validate_public_ip(t.ip)
+ if self._stopped or (cancel_event and cancel_event.is_set()):
+ return
+ async with sem:
+ if self._stopped or (cancel_event and cancel_event.is_set()):
+ return
+ samples = await self.probe.probe(t, include_quic=t.transport_profile == "quic")
+ med, jit, loss, hs, stable = summarize(samples)
+ score_value = (stable * 0.45) + ((1.0 - min(1.0, loss)) * 0.25) + (hs * 0.20) + (1.0 / (1.0 + jit + med / 100.0) * 0.10)
+ score = RouteScore(t, med, jit, loss, hs, stable, score_value)
+ results.append(score)
+ logger.info(
+ "route_score_breakdown",
+ extra={
+ "route": self._route_key(t),
+ "median_rtt_ms": med,
+ "jitter_ms": jit,
+ "packet_loss": loss,
+ "handshake_success_rate": hs,
+ "session_stability": stable,
+ "score": score_value,
+ },
+ )
+ await self.store.record_score(score)
+
+ self._scan_tasks = {asyncio.create_task(worker(t)) for t in ordered_targets}
+ try:
+ await asyncio.gather(*self._scan_tasks, return_exceptions=False)
+ finally:
+ self._scan_tasks.clear()
+ return sorted(results, key=lambda r: (r.score, -r.packet_loss, -r.jitter_ms, -r.median_rtt_ms, self._route_key(r.target)), reverse=True)
+
+ async def select_route(self, candidates: list[RouteScore], gameplay_active: bool) -> RouteScore | None:
+ if self._stopped:
+ return None
+ now = time.time()
+ if not candidates:
+ logger.info("route_rejected", extra={"reason": "no_candidates", "selected": self._route_key(self._active_route.target) if self._active_route else None})
+ return self._active_route
+ best = candidates[0]
+ if self._active_route and gameplay_active and not self._hard_failure(self._active_route.target):
+ self._session.state = "session_active"
+ logger.info("session_transition", extra={"transition": "session_active", "route": self._route_key(self._active_route.target)})
+ logger.info("route_rejected", extra={"reason": "session_active_locked", "selected": self._route_key(self._active_route.target), "candidate": self._route_key(best.target)})
+ return self._active_route
+ if (
+ self._active_route
+ and not self._hard_failure(self._active_route.target)
+ and now - self._active_route.sampled_at < self.cfg.switch_guard_s
+ ):
+ logger.info(
+ "route_rejected",
+ extra={
+ "reason": "switch_guard",
+ "selected": self._route_key(self._active_route.target),
+ "candidate": self._route_key(best.target),
+ },
+ )
+ return self._active_route
+ if self._active_route and not self._hard_failure(self._active_route.target):
+ logger.info("route_rejected", extra={"reason": "active_route_not_failed", "selected": self._route_key(self._active_route.target), "candidate": self._route_key(best.target)})
+ return self._active_route
+ self._active_route = best
+ self._active_until = now + self.cfg.sticky_session_s
+ self._session.state = "session_stable_window" if gameplay_active else "session_start"
+ self._session.stable_since = now
+ logger.info("session_transition", extra={"transition": self._session.state, "route": self._route_key(best.target)})
+ logger.info("route_selected", extra={"route": self._route_key(best.target), "score": best.score, "median_rtt_ms": best.median_rtt_ms, "jitter_ms": best.jitter_ms, "packet_loss": best.packet_loss, "handshake": best.handshake_success_rate, "stability": best.session_stability})
+ return best
+
+ async def record_runtime_metrics(self, target: ProbeTarget, metrics: RuntimeMetrics) -> None:
+ await self.store.record_runtime_metrics(target, metrics)
+
+ def register_route_failure(self, target: ProbeTarget) -> bool:
+ key = self._route_key(target)
+ self._failure_counts[key] = self._failure_counts.get(key, 0) + 1
+ logger.info("session_transition", extra={"transition": "session_fail", "route": key, "failure_count": self._failure_counts[key]})
+ return self._hard_failure(target)
+
+ def bound_transport_route(self) -> ProbeTarget | None:
+ return self._active_route.target if self._active_route else None
+
+ async def shutdown(self) -> None:
+ self._stopped = True
+ for task in list(self._scan_tasks):
+ task.cancel()
+ if self._scan_tasks:
+ await asyncio.gather(*self._scan_tasks, return_exceptions=True)
+ self._scan_tasks.clear()
+
+ def _route_key(self, target: ProbeTarget) -> str:
+ return f"{target.ip}:{target.port}:{target.sni}:{target.transport_profile}"
+
+ def _hard_failure(self, target: ProbeTarget) -> bool:
+ return self._failure_counts.get(self._route_key(target), 0) >= self.cfg.circuit_breaker_failures
diff --git a/src/core/adaptive_transport/hygiene.py b/src/core/adaptive_transport/hygiene.py
new file mode 100644
index 0000000..5182efc
--- /dev/null
+++ b/src/core/adaptive_transport/hygiene.py
@@ -0,0 +1,21 @@
+from __future__ import annotations
+
+import ipaddress
+
+
+def validate_public_ip(ip: str) -> str:
+ try:
+ addr = ipaddress.ip_address(ip)
+ except ValueError as exc:
+ raise ValueError(f"invalid ip: {ip}") from exc
+
+ if (
+ addr.is_private
+ or addr.is_multicast
+ or addr.is_loopback
+ or addr.is_reserved
+ or addr.is_unspecified
+ or addr.is_link_local
+ ):
+ raise ValueError(f"non-public ip rejected: {ip}")
+ return ip
diff --git a/src/core/adaptive_transport/models.py b/src/core/adaptive_transport/models.py
new file mode 100644
index 0000000..56bc159
--- /dev/null
+++ b/src/core/adaptive_transport/models.py
@@ -0,0 +1,51 @@
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import Literal
+import time
+
+ProbeKind = Literal["tcp_syn", "tls", "h2_preface", "quic"]
+
+
+@dataclass(slots=True, frozen=True)
+class ProbeTarget:
+ ip: str
+ port: int
+ sni: str
+ alpn: tuple[str, ...] = ("h2", "http/1.1")
+ transport_profile: str = "vless_reality"
+
+
+@dataclass(slots=True)
+class ProbeObservation:
+ kind: ProbeKind
+ ok: bool
+ latency_ms: float
+ packet_loss: float = 0.0
+
+
+@dataclass(slots=True)
+class RouteScore:
+ target: ProbeTarget
+ median_rtt_ms: float
+ jitter_ms: float
+ packet_loss: float
+ handshake_success_rate: float
+ session_stability: float
+ score: float
+ sampled_at: float = field(default_factory=time.time)
+
+
+@dataclass(slots=True)
+class RuntimeMetrics:
+ disconnects: int = 0
+ retransmissions: int = 0
+ latency_spikes: int = 0
+ packet_delay_variance: float = 0.0
+
+
+@dataclass(slots=True)
+class SessionState:
+ state: Literal["session_start", "session_active", "session_stable_window"] = "session_start"
+ started_at: float = field(default_factory=time.time)
+ stable_since: float | None = None
diff --git a/src/core/adaptive_transport/probe.py b/src/core/adaptive_transport/probe.py
new file mode 100644
index 0000000..3cfb5b6
--- /dev/null
+++ b/src/core/adaptive_transport/probe.py
@@ -0,0 +1,129 @@
+from __future__ import annotations
+
+import asyncio
+import socket
+import ssl
+import statistics
+import time
+from dataclasses import dataclass
+
+from .models import ProbeObservation, ProbeTarget
+
+
+@dataclass(slots=True)
+class ProbeConfig:
+ timeout_s: float = 2.5
+ retries: int = 3
+
+
+class AsyncRouteProbe:
+ def __init__(self, cfg: ProbeConfig | None = None):
+ self.cfg = cfg or ProbeConfig()
+
+ async def probe(self, target: ProbeTarget, include_quic: bool = False) -> list[ProbeObservation]:
+ samples: list[ProbeObservation] = []
+ for _ in range(self.cfg.retries):
+ samples.append(await self._tcp_syn_probe(target))
+ samples.append(await self._tls_probe(target))
+ samples.append(await self._h2_preface_probe(target))
+ if include_quic:
+ samples.append(await self._quic_probe(target))
+ return samples
+
+ async def _tcp_syn_probe(self, target: ProbeTarget) -> ProbeObservation:
+ start = time.perf_counter()
+ ok = False
+ writer = None
+ try:
+ fut = asyncio.open_connection(target.ip, target.port)
+ _, writer = await asyncio.wait_for(fut, timeout=self.cfg.timeout_s)
+ ok = True
+ except Exception:
+ ok = False
+ finally:
+ if writer is not None:
+ writer.close()
+ await writer.wait_closed()
+ return ProbeObservation("tcp_syn", ok, (time.perf_counter() - start) * 1000)
+
+ async def _tls_probe(self, target: ProbeTarget) -> ProbeObservation:
+ ctx = ssl.create_default_context()
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
+ ctx.set_alpn_protocols(list(target.alpn))
+ start = time.perf_counter()
+ writer = None
+ ok = False
+ try:
+ reader, writer = await asyncio.wait_for(
+ asyncio.open_connection(target.ip, target.port, ssl=ctx, server_hostname=target.sni),
+ timeout=self.cfg.timeout_s,
+ )
+ ssl_obj = writer.get_extra_info("ssl_object")
+ ok = bool(reader and ssl_obj and ssl_obj.version())
+ except Exception:
+ ok = False
+ finally:
+ if writer is not None:
+ writer.close()
+ await writer.wait_closed()
+ return ProbeObservation("tls", ok, (time.perf_counter() - start) * 1000)
+
+ async def _h2_preface_probe(self, target: ProbeTarget) -> ProbeObservation:
+ ctx = ssl.create_default_context()
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
+ ctx.set_alpn_protocols(["h2"])
+ start = time.perf_counter()
+ writer = None
+ reader = None
+ ok = False
+ try:
+ reader, writer = await asyncio.wait_for(
+ asyncio.open_connection(target.ip, target.port, ssl=ctx, server_hostname=target.sni),
+ timeout=self.cfg.timeout_s,
+ )
+ ssl_obj = writer.get_extra_info("ssl_object")
+ if not ssl_obj or ssl_obj.selected_alpn_protocol() != "h2":
+ raise RuntimeError("h2 alpn not negotiated")
+ writer.write(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")
+ await writer.drain()
+ frame_header = await asyncio.wait_for(reader.readexactly(9), timeout=0.4)
+ ok = len(frame_header) == 9
+ except Exception:
+ ok = False
+ finally:
+ if writer is not None:
+ writer.close()
+ await writer.wait_closed()
+ return ProbeObservation("h2_preface", ok, (time.perf_counter() - start) * 1000)
+
+ async def _quic_probe(self, target: ProbeTarget) -> ProbeObservation:
+ loop = asyncio.get_running_loop()
+ start = time.perf_counter()
+ udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ udp_socket.setblocking(False)
+ ok = False
+ try:
+ udp_socket.connect((target.ip, target.port))
+ initial = bytes.fromhex("c300000001088394c8f03e5157080000449e00000002")
+ await loop.sock_sendall(udp_socket, initial)
+ response = await asyncio.wait_for(loop.sock_recv(udp_socket, 1200), timeout=min(self.cfg.timeout_s, 0.5))
+ ok = bool(response)
+ except Exception:
+ ok = False
+ finally:
+ udp_socket.close()
+ return ProbeObservation("quic", ok, (time.perf_counter() - start) * 1000)
+
+
+def summarize(samples: list[ProbeObservation]) -> tuple[float, float, float, float, float]:
+ lat = [s.latency_ms for s in samples if s.ok]
+ if not lat:
+ return 9_999.0, 5_000.0, 1.0, 0.0, 0.0
+ median = statistics.median(lat)
+ jitter = statistics.pstdev(lat) if len(lat) > 1 else 0.0
+ loss = 1.0 - (len(lat) / max(1, len(samples)))
+ handshake_success = len([s for s in samples if s.kind == "tls" and s.ok]) / max(1, len([s for s in samples if s.kind == "tls"]))
+ stability = 1.0 / (1.0 + jitter + (loss * 100.0))
+ return median, jitter, loss, handshake_success, stability
diff --git a/src/core/adaptive_transport/storage.py b/src/core/adaptive_transport/storage.py
new file mode 100644
index 0000000..2560b82
--- /dev/null
+++ b/src/core/adaptive_transport/storage.py
@@ -0,0 +1,128 @@
+from __future__ import annotations
+
+import asyncio
+import sqlite3
+import time
+from pathlib import Path
+
+from .models import ProbeTarget, RouteScore, RuntimeMetrics
+
+
+class RouteIntelligenceStore:
+ def __init__(self, path: str):
+ self.path = Path(path)
+ self._lock = asyncio.Lock()
+ self._init_db()
+
+ def _init_db(self) -> None:
+ with sqlite3.connect(self.path) as conn:
+ conn.execute("PRAGMA journal_mode=WAL")
+ conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS route_scores (
+ ip TEXT NOT NULL,
+ port INTEGER NOT NULL,
+ sni TEXT NOT NULL,
+ profile TEXT NOT NULL,
+ sampled_at REAL NOT NULL,
+ median_rtt_ms REAL NOT NULL,
+ jitter_ms REAL NOT NULL,
+ packet_loss REAL NOT NULL,
+ handshake_success REAL NOT NULL,
+ session_stability REAL NOT NULL,
+ score REAL NOT NULL,
+ success_count INTEGER NOT NULL DEFAULT 0,
+ failure_count INTEGER NOT NULL DEFAULT 0,
+ cooldown_until REAL NOT NULL DEFAULT 0,
+ retired INTEGER NOT NULL DEFAULT 0
+ )
+ """
+ )
+ conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS route_runtime_metrics (
+ ip TEXT NOT NULL,
+ port INTEGER NOT NULL,
+ sni TEXT NOT NULL,
+ profile TEXT NOT NULL,
+ observed_at REAL NOT NULL,
+ disconnects INTEGER NOT NULL,
+ retransmissions INTEGER NOT NULL,
+ latency_spikes INTEGER NOT NULL,
+ packet_delay_variance REAL NOT NULL
+ )
+ """
+ )
+
+ async def record_score(self, score: RouteScore) -> None:
+ async with self._lock:
+ await asyncio.to_thread(self._record_score_sync, score)
+
+ def _record_score_sync(self, score: RouteScore) -> None:
+ with sqlite3.connect(self.path, timeout=30) as conn:
+ conn.execute("PRAGMA journal_mode=WAL")
+ conn.execute(
+ """INSERT INTO route_scores
+ (ip,port,sni,profile,sampled_at,median_rtt_ms,jitter_ms,packet_loss,handshake_success,session_stability,score,success_count,failure_count)
+ VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""",
+ (
+ score.target.ip,
+ score.target.port,
+ score.target.sni,
+ score.target.transport_profile,
+ score.sampled_at,
+ score.median_rtt_ms,
+ score.jitter_ms,
+ score.packet_loss,
+ score.handshake_success_rate,
+ score.session_stability,
+ score.score,
+ 1 if score.score > 0 else 0,
+ 0 if score.score > 0 else 1,
+ ),
+ )
+
+ async def record_runtime_metrics(self, target: ProbeTarget, metrics: RuntimeMetrics, observed_at: float | None = None) -> None:
+ async with self._lock:
+ await asyncio.to_thread(self._record_runtime_metrics_sync, target, metrics, observed_at or time.time())
+
+ def _record_runtime_metrics_sync(self, target: ProbeTarget, metrics: RuntimeMetrics, observed_at: float) -> None:
+ with sqlite3.connect(self.path, timeout=30) as conn:
+ conn.execute("PRAGMA journal_mode=WAL")
+ conn.execute(
+ """INSERT INTO route_runtime_metrics
+ (ip,port,sni,profile,observed_at,disconnects,retransmissions,latency_spikes,packet_delay_variance)
+ VALUES (?,?,?,?,?,?,?,?,?)""",
+ (target.ip, target.port, target.sni, target.transport_profile, observed_at, metrics.disconnects, metrics.retransmissions, metrics.latency_spikes, metrics.packet_delay_variance),
+ )
+
+ async def top_routes(self, limit: int = 5, decay_window_s: float = 900.0) -> list[tuple[str, int, str, str, float]]:
+ now = time.time()
+ async with self._lock:
+ return await asyncio.to_thread(self._top_routes_sync, limit, now, decay_window_s)
+
+ def _top_routes_sync(self, limit: int, now: float, decay_window_s: float):
+ with sqlite3.connect(self.path) as conn:
+ rows = conn.execute(
+ """SELECT s.ip,s.port,s.sni,s.profile,
+ AVG(
+ s.score * CASE
+ WHEN (? - s.sampled_at) >= ? THEN 0.05
+ ELSE EXP(-((? - s.sampled_at) / ?))
+ END
+ )
+ - COALESCE(AVG(ABS(s.score - ss.mean_score)), 0.0) * 0.35
+ - COALESCE(SUM(m.disconnects + m.retransmissions + m.latency_spikes) * 0.02, 0.0)
+ - COALESCE(AVG(m.packet_delay_variance) * 0.01, 0.0) AS decayed
+ FROM route_scores s
+ LEFT JOIN (
+ SELECT ip,port,sni,profile,AVG(score) AS mean_score FROM route_scores GROUP BY ip,port,sni,profile
+ ) ss ON ss.ip=s.ip AND ss.port=s.port AND ss.sni=s.sni AND ss.profile=s.profile
+ LEFT JOIN route_runtime_metrics m ON m.ip=s.ip AND m.port=s.port AND m.sni=s.sni AND m.profile=s.profile
+ WHERE s.retired = 0 AND s.cooldown_until < ?
+ GROUP BY s.ip,s.port,s.sni,s.profile
+ ORDER BY decayed DESC
+ LIMIT ?""",
+ (now, max(1.0, decay_window_s), now, max(1.0, decay_window_s), now, max(1.0, decay_window_s), now, limit),
+ ).fetchall()
+ return rows
diff --git a/src/core/adaptive_transport/transport_profiles.py b/src/core/adaptive_transport/transport_profiles.py
new file mode 100644
index 0000000..2e4eabb
--- /dev/null
+++ b/src/core/adaptive_transport/transport_profiles.py
@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+
+@dataclass(frozen=True, slots=True)
+class TransportProfile:
+ name: str
+ protocol: str
+ fallback: tuple[str, ...]
+ browser_fingerprint: str
+
+
+PROFILES = {
+ "vless_reality": TransportProfile("vless_reality", "tcp+tls", ("h2", "h3", "udp_over_tcp"), "chrome_124"),
+ "http2_fallback": TransportProfile("http2_fallback", "h2", ("h1",), "firefox_125"),
+ "http3_fallback": TransportProfile("http3_fallback", "h3", ("h2",), "chrome_124"),
+ "udp_over_tcp": TransportProfile("udp_over_tcp", "tcp", ("h2",), "safari_17"),
+ "quic": TransportProfile("quic", "udp", ("h3", "h2"), "chrome_124"),
+}
diff --git a/src/core/config_sync.py b/src/core/config_sync.py
new file mode 100644
index 0000000..64a1e03
--- /dev/null
+++ b/src/core/config_sync.py
@@ -0,0 +1,66 @@
+from __future__ import annotations
+
+import hashlib
+import json
+from pathlib import Path
+
+
+class ConfigSyncError(RuntimeError):
+ pass
+
+
+class ConfigSyncManager:
+ def __init__(self, repo_root: Path):
+ self.repo_root = repo_root
+ self.config_path = repo_root / "config.json"
+ self.code_gs_path = repo_root / "apps_script" / "Code.gs"
+ self.worker_path = repo_root / "apps_script" / "cloudflare_worker.js"
+ self.state_path = repo_root / ".config_sync_state.json"
+
+ def sync_all_configs(self, config: dict) -> dict:
+ if not isinstance(config, dict):
+ raise ConfigSyncError("config must be a JSON object")
+ script_ids = config.get("script_ids") or config.get("script_id")
+ if not script_ids:
+ raise ConfigSyncError("missing script_id/script_ids")
+ auth_key = str(config.get("auth_key", "")).strip()
+ if not auth_key:
+ raise ConfigSyncError("missing auth_key")
+
+ artifacts = {
+ "config.json": self._normalized_config(config),
+ "apps_script/Code.gs": self._sync_code_gs(auth_key),
+ "apps_script/cloudflare_worker.js": self._sync_worker_psk(config),
+ }
+ self._write_sync_state(artifacts)
+ return artifacts
+
+ def _normalized_config(self, config: dict) -> str:
+ return json.dumps(config, ensure_ascii=False, indent=2) + "\n"
+
+ def _sync_code_gs(self, auth_key: str) -> str:
+ content = self.code_gs_path.read_text(encoding="utf-8")
+ marker = 'const AUTH_KEY = "'
+ idx = content.find(marker)
+ if idx < 0:
+ raise ConfigSyncError("AUTH_KEY declaration not found in Code.gs")
+ end = content.find('";', idx + len(marker))
+ if end < 0:
+ raise ConfigSyncError("AUTH_KEY declaration malformed in Code.gs")
+ return content[: idx + len(marker)] + auth_key + content[end:]
+
+ def _sync_worker_psk(self, config: dict) -> str:
+ content = self.worker_path.read_text(encoding="utf-8")
+ psk = str(config.get("exit_node", {}).get("psk") or config.get("auth_key", "")).strip()
+ marker = 'const PSK = "'
+ idx = content.find(marker)
+ if idx < 0:
+ raise ConfigSyncError("PSK declaration not found in cloudflare_worker.js")
+ end = content.find('";', idx + len(marker))
+ if end < 0:
+ raise ConfigSyncError("PSK declaration malformed in cloudflare_worker.js")
+ return content[: idx + len(marker)] + psk + content[end:]
+
+ def _write_sync_state(self, artifacts: dict[str, str]) -> None:
+ checksums = {k: hashlib.sha256(v.encode("utf-8")).hexdigest() for k, v in artifacts.items()}
+ self.state_path.write_text(json.dumps({"checksums": checksums}, indent=2) + "\n", encoding="utf-8")
diff --git a/src/core/dns_cache.py b/src/core/dns_cache.py
new file mode 100644
index 0000000..e43952e
--- /dev/null
+++ b/src/core/dns_cache.py
@@ -0,0 +1,35 @@
+from __future__ import annotations
+
+import time
+from dataclasses import dataclass
+
+
+@dataclass(slots=True)
+class DnsEntry:
+ families: list[tuple[int, str]]
+ expires_at: float
+
+
+class DnsCache:
+ def __init__(self, ttl_s: float = 120.0, max_items: int = 4096):
+ self.ttl_s = max(1.0, ttl_s)
+ self.max_items = max(64, max_items)
+ self._cache: dict[str, DnsEntry] = {}
+
+ def get(self, host: str) -> list[tuple[int, str]] | None:
+ item = self._cache.get(host)
+ now = time.time()
+ if not item:
+ return None
+ if item.expires_at <= now:
+ self._cache.pop(host, None)
+ return None
+ return list(item.families)
+
+ def set(self, host: str, families: list[tuple[int, str]]) -> None:
+ if len(self._cache) >= self.max_items:
+ # bounded memory: evict oldest inserted key
+ oldest = next(iter(self._cache.keys()), None)
+ if oldest is not None:
+ self._cache.pop(oldest, None)
+ self._cache[host] = DnsEntry(families=list(families), expires_at=time.time() + self.ttl_s)
diff --git a/src/core/google_ip_scanner.py b/src/core/google_ip_scanner.py
index 9a17c5a..96dc30a 100644
--- a/src/core/google_ip_scanner.py
+++ b/src/core/google_ip_scanner.py
@@ -9,7 +9,10 @@
from __future__ import annotations
import asyncio
+import contextlib
+import json
import logging
+import random
import ssl
import time
from dataclasses import dataclass
@@ -32,6 +35,84 @@ def ok(self) -> bool:
return self.latency_ms is not None
+def _classify_fronted_failure(response: bytes, expected_host: str) -> str | None:
+ text = response.decode("utf-8", errors="ignore")
+ lower = text.lower()
+ if " ProbeResult:
+ base_delay = 0.2
+ ctx = ssl.create_default_context()
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
+ ctx.set_alpn_protocols(["h2", "http/1.1"])
+ start_time = time.time()
+ last_error = "probe_failed"
+ for attempt in range(retries):
+ writer = None
+ try:
+ # TLS warm-up + relay-aligned TLS path to front domain with explicit IPv4 target.
+ reader, writer = await asyncio.wait_for(
+ asyncio.open_connection(ip, 443, ssl=ctx, server_hostname=sni),
+ timeout=timeout,
+ )
+ ssl_obj = writer.get_extra_info("ssl_object")
+ if not ssl_obj:
+ return ProbeResult(ip=ip, error="tls_warmup_failed")
+ if ssl_obj.selected_alpn_protocol() == "h2":
+ writer.write(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")
+ await writer.drain()
+ head = await asyncio.wait_for(reader.read(256), timeout=timeout)
+ else:
+ req = f"HEAD / HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n".encode()
+ writer.write(req)
+ await writer.drain()
+ head = await asyncio.wait_for(reader.read(512), timeout=timeout)
+ failure = _classify_fronted_failure(head, sni)
+ if failure:
+ return ProbeResult(ip=ip, error=failure)
+ if not head:
+ return ProbeResult(ip=ip, error="empty_response")
+ return ProbeResult(ip=ip, latency_ms=int((time.time() - start_time) * 1000))
+ except asyncio.TimeoutError:
+ last_error = "timeout"
+ except OSError as e:
+ last_error = f"network_error:{e.strerror or str(e)}"
+ except Exception as e:
+ last_error = f"probe_failed:{type(e).__name__}"
+ finally:
+ if writer is not None:
+ writer.close()
+ with contextlib.suppress(Exception):
+ await writer.wait_closed()
+ if attempt < retries - 1:
+ await asyncio.sleep((base_delay * (2**attempt)) + random.uniform(0.0, 0.075))
+ return ProbeResult(ip=ip, error=last_error)
+
+
async def _probe_ip(
ip: str,
sni: str,
@@ -51,60 +132,7 @@ async def _probe_ip(
ProbeResult with latency_ms (if successful) or error message.
"""
async with semaphore:
- start_time = time.time()
- try:
- # Create SSL context that skips certificate verification
- ctx = ssl.create_default_context()
- ctx.check_hostname = False
- ctx.verify_mode = ssl.CERT_NONE
-
- # Connect to IP:443 with SNI set to the fronting domain
- reader, writer = await asyncio.wait_for(
- asyncio.open_connection(
- ip,
- 443,
- ssl=ctx,
- server_hostname=sni,
- ),
- timeout=timeout,
- )
-
- # Send minimal HTTP HEAD request
- request = f"HEAD / HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n"
- writer.write(request.encode())
- await writer.drain()
-
- # Read response header (first 256 bytes is plenty for HTTP status)
- response = await asyncio.wait_for(reader.read(256), timeout=timeout)
-
- writer.close()
- try:
- await writer.wait_closed()
- except Exception:
- pass
-
- # Check if we got an HTTP response
- if not response:
- return ProbeResult(ip=ip, error="empty response")
-
- response_str = response.decode("utf-8", errors="ignore")
- if not response_str.startswith("HTTP/"):
- return ProbeResult(ip=ip, error=f"invalid response: {response_str[:30]!r}")
-
- # Success — return latency in milliseconds
- elapsed_ms = int((time.time() - start_time) * 1000)
- return ProbeResult(ip=ip, latency_ms=elapsed_ms)
-
- except asyncio.TimeoutError:
- return ProbeResult(ip=ip, error="timeout")
- except ConnectionRefusedError:
- return ProbeResult(ip=ip, error="connection refused")
- except ConnectionResetError:
- return ProbeResult(ip=ip, error="connection reset")
- except OSError as e:
- return ProbeResult(ip=ip, error=f"network error: {e.strerror or str(e)}")
- except Exception as e:
- return ProbeResult(ip=ip, error=f"probe failed: {type(e).__name__}")
+ return await probe_via_fronted_h2(ip=ip, sni=sni, timeout=timeout)
async def run(front_domain: str) -> bool:
diff --git a/src/core/logging_utils.py b/src/core/logging_utils.py
index 8f3df11..c2c1c5b 100644
--- a/src/core/logging_utils.py
+++ b/src/core/logging_utils.py
@@ -187,7 +187,8 @@ def format(self, record: logging.LogRecord) -> str:
if self.use_color:
time_part = f"{DIM}{FG_GRAY}{time_part}{RESET}"
- line = f"{time_part} {level_part} {comp_part} {message}"
+ event_meta = _event_metadata(record)
+ line = f"{time_part} {level_part} {comp_part} {message}{event_meta}"
# Exception tracebacks: render dimmed below the main line.
if record.exc_info:
@@ -204,6 +205,23 @@ def format(self, record: logging.LogRecord) -> str:
return line
+def _event_metadata(record: logging.LogRecord) -> str:
+ """Render common structured metadata fields when present."""
+ fields = []
+ for key in (
+ "request_type",
+ "status_code",
+ "success",
+ "quota_state",
+ "relay_state",
+ ):
+ if hasattr(record, key):
+ fields.append(f"{key}={getattr(record, key)}")
+ if not fields:
+ return ""
+ return " {" + ", ".join(fields) + "}"
+
+
# ─── public API ────────────────────────────────────────────────────────────
def configure(level: str = "INFO", *, stream=None) -> None:
diff --git a/src/core/quota_monitor.py b/src/core/quota_monitor.py
new file mode 100644
index 0000000..0be51f5
--- /dev/null
+++ b/src/core/quota_monitor.py
@@ -0,0 +1,71 @@
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+import time
+
+
+@dataclass(slots=True)
+class ScriptQuotaStat:
+ success: int = 0
+ failure: int = 0
+ recent_errors: list[str] = field(default_factory=list)
+
+ def state(self) -> str:
+ if any("quota" in e.lower() for e in self.recent_errors[-5:]):
+ return "WARN"
+ if self.failure > (self.success * 2 + 3):
+ return "DEAD"
+ return "GOOD"
+
+
+class QuotaMonitor:
+ def __init__(self):
+ self._stats: dict[str, ScriptQuotaStat] = {}
+ self._events: dict[str, list[tuple[float, bool]]] = {}
+ self._cooldown_until: dict[str, float] = {}
+
+ def record(self, script_id: str, ok: bool, error: str | None = None) -> None:
+ stat = self._stats.setdefault(script_id, ScriptQuotaStat())
+ self._events.setdefault(script_id, []).append((time.time(), ok))
+ if len(self._events[script_id]) > 2000:
+ self._events[script_id] = self._events[script_id][-2000:]
+ if ok:
+ stat.success += 1
+ else:
+ stat.failure += 1
+ if error:
+ stat.recent_errors.append(error)
+ if len(stat.recent_errors) > 50:
+ stat.recent_errors = stat.recent_errors[-50:]
+ if "quota" in error.lower():
+ self._cooldown_until[script_id] = time.time() + 900.0
+
+ def snapshot(self) -> dict[str, str]:
+ return {sid: self.state(sid) for sid in self._stats.keys()}
+
+ def state(self, script_id: str) -> str:
+ stat = self._stats.get(script_id)
+ if stat is None:
+ return "UNKNOWN"
+ now = time.time()
+ if self._cooldown_until.get(script_id, 0.0) > now:
+ return "COOLDOWN"
+ return stat.state()
+
+ def success_rate(self, script_id: str, window_s: float = 900.0) -> float:
+ now = time.time()
+ events = self._events.get(script_id, [])
+ recent = [ok for ts, ok in events if ts >= now - max(1.0, window_s)]
+ if not recent:
+ return 0.0
+ return sum(1 for ok in recent if ok) / len(recent)
+
+ def predict_exhaustion_risk(self, script_id: str) -> str:
+ sr = self.success_rate(script_id)
+ if self.state(script_id) == "COOLDOWN":
+ return "HIGH"
+ if sr < 0.35:
+ return "HIGH"
+ if sr < 0.65:
+ return "MEDIUM"
+ return "LOW"
diff --git a/src/proxy/proxy_server.py b/src/proxy/proxy_server.py
index c6c2fe9..78e79ba 100644
--- a/src/proxy/proxy_server.py
+++ b/src/proxy/proxy_server.py
@@ -38,6 +38,7 @@
TRACE_HOST_SUFFIXES,
UNCACHEABLE_HEADER_NAMES,
)
+from core.dns_cache import DnsCache
from relay.domain_fronter import DomainFronter
from .socks5 import negotiate_socks5
from .proxy_support import (
@@ -96,6 +97,10 @@ def __init__(self, config: dict):
self._tcp_connect_timeout = self._cfg_float(
config, "tcp_connect_timeout", TCP_CONNECT_TIMEOUT, minimum=1.0,
)
+ self._dns_cache = DnsCache(
+ ttl_s=float(config.get("dns_cache_ttl_s", 120.0)),
+ max_items=int(config.get("dns_cache_max_items", 4096)),
+ )
self._download_min_size = self._cfg_int(
config, "chunked_download_min_size", 5 * 1024 * 1024, minimum=0,
)
@@ -520,7 +525,11 @@ async def _on_socks_client(self, reader: asyncio.StreamReader,
result = await negotiate_socks5(reader, writer)
if result is None:
return
- host, port = result
+ cmd, host, port = result
+ if cmd == "udp_associate":
+ log.info("SOCKS5 UDP ASSOCIATE requested by %s", addr)
+ await self._handle_socks5_udp_associate(reader, writer)
+ return
log.info("SOCKS5 CONNECT → %s:%d", host, port)
await self._handle_target_tunnel(host, port, reader, writer)
except asyncio.IncompleteReadError:
@@ -539,6 +548,29 @@ async def _on_socks_client(self, reader: asyncio.StreamReader,
except Exception:
pass
+
+
+ async def _handle_socks5_udp_associate(self,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter):
+ """Handle SOCKS5 UDP ASSOCIATE for low-latency UDP apps (e.g., games)."""
+ loop = asyncio.get_running_loop()
+ transport, protocol = await loop.create_datagram_endpoint(
+ lambda: _Socks5UdpRelayProtocol(self.fronter),
+ local_addr=(self.socks_host, 0),
+ )
+ sockname = transport.get_extra_info("sockname")
+ bind_ip = sockname[0] if sockname else self.socks_host
+ bind_port = sockname[1] if sockname else 0
+
+ try:
+ writer.write(b"\x05\x00\x00\x01" + socket.inet_aton(bind_ip) + bind_port.to_bytes(2, "big"))
+ await writer.drain()
+ await reader.read()
+ finally:
+ transport.close()
+
+
# ── CONNECT (HTTPS tunnelling) ────────────────────────────────
async def _do_connect(self, target: str, reader, writer):
@@ -782,30 +814,35 @@ async def _open_tcp_connection(self, target: str, port: int,
ipaddress.ip_address(lookup_target)
candidates = [(0, lookup_target)]
except ValueError:
- try:
- infos = await asyncio.wait_for(
- loop.getaddrinfo(
- lookup_target,
- port,
- family=socket.AF_UNSPEC,
- type=socket.SOCK_STREAM,
- ),
- timeout=timeout,
- )
- except Exception as exc:
- raise OSError(f"dns lookup failed for {lookup_target}: {exc!r}") from exc
-
- candidates = []
- seen = set()
- for family, _type, _proto, _canon, sockaddr in infos:
- ip = sockaddr[0]
- key = (family, ip)
- if key in seen:
- continue
- seen.add(key)
- candidates.append((family, ip))
-
- candidates.sort(key=lambda item: 0 if item[0] == socket.AF_INET else 1)
+ cached = self._dns_cache.get(lookup_target)
+ if cached is not None:
+ candidates = cached
+ else:
+ try:
+ infos = await asyncio.wait_for(
+ loop.getaddrinfo(
+ lookup_target,
+ port,
+ family=socket.AF_UNSPEC,
+ type=socket.SOCK_STREAM,
+ ),
+ timeout=timeout,
+ )
+ except Exception as exc:
+ raise OSError(f"dns lookup failed for {lookup_target}: {exc!r}") from exc
+
+ candidates = []
+ seen = set()
+ for family, _type, _proto, _canon, sockaddr in infos:
+ ip = sockaddr[0]
+ key = (family, ip)
+ if key in seen:
+ continue
+ seen.add(key)
+ candidates.append((family, ip))
+
+ candidates.sort(key=lambda item: 0 if item[0] == socket.AF_INET else 1)
+ self._dns_cache.set(lookup_target, candidates)
for family, ip in candidates:
try:
@@ -1408,3 +1445,72 @@ async def _do_http(self, header_block: bytes, reader, writer):
writer.write(response)
await writer.drain()
+
+
+class _Socks5UdpRelayProtocol(asyncio.DatagramProtocol):
+ """Best-effort SOCKS5 UDP relay (single-hop direct UDP forwarding)."""
+
+ def __init__(self, fronter):
+ self.transport: asyncio.DatagramTransport | None = None
+ self._client_addr = None
+ self._fronter = fronter
+ self._loop = asyncio.get_running_loop()
+
+ def connection_made(self, transport):
+ self.transport = transport
+
+
+ async def _forward_udp(self, dst: str, dport: int, payload: bytes):
+ if self.transport is None or self._client_addr is None:
+ return
+ relayed = await self._fronter.relay_udp_packet(dst, dport, payload)
+ if relayed is not None:
+ header = self._build_socks_header(dst, dport)
+ if header is not None:
+ self.transport.sendto(header + relayed, self._client_addr)
+ return
+ self.transport.sendto(payload, (dst, dport))
+
+ @staticmethod
+ def _build_socks_header(host: str, port: int) -> bytes | None:
+ try:
+ host_raw = socket.inet_aton(host)
+ return b"\x00\x00\x00\x01" + host_raw + port.to_bytes(2, "big")
+ except OSError:
+ return None
+ def datagram_received(self, data, addr):
+ if self.transport is None:
+ return
+
+ if self._client_addr is None or addr == self._client_addr:
+ self._client_addr = addr
+ if len(data) < 10 or data[2] != 0x00:
+ return
+ atyp = data[3]
+ pos = 4
+ if atyp == 0x01: # IPv4
+ dst = socket.inet_ntoa(data[pos:pos + 4])
+ pos += 4
+ elif atyp == 0x03: # Domain
+ ln = data[pos]
+ pos += 1
+ dst = data[pos:pos + ln].decode(errors="replace")
+ pos += ln
+ elif atyp == 0x04: # IPv6
+ dst = socket.inet_ntop(socket.AF_INET6, data[pos:pos + 16])
+ pos += 16
+ else:
+ return
+ dport = int.from_bytes(data[pos:pos + 2], "big")
+ payload = data[pos + 2:]
+ self._loop.create_task(self._forward_udp(dst, dport, payload))
+ return
+
+ # response from remote server -> encapsulate back to client
+ host, port = addr[0], addr[1]
+ try:
+ host_raw = socket.inet_aton(host)
+ header = b"\x00\x00\x00\x01" + host_raw + port.to_bytes(2, "big")
+ except OSError:
+ return
+ self.transport.sendto(header + data, self._client_addr)
diff --git a/src/proxy/socks5.py b/src/proxy/socks5.py
index 6af0375..97fd920 100644
--- a/src/proxy/socks5.py
+++ b/src/proxy/socks5.py
@@ -22,8 +22,8 @@
async def negotiate_socks5(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
-) -> tuple[str, int] | None:
- """Perform a SOCKS5 handshake and return the requested (host, port).
+ ) -> tuple[str, str, int] | None:
+ """Perform a SOCKS5 handshake and return the requested SOCKS command and destination tuple.
Sends protocol-level replies directly to *writer*. Returns ``None``
and leaves the connection in a closed state if negotiation fails at
@@ -55,8 +55,8 @@ async def negotiate_socks5(
# ── Request ───────────────────────────────────────────────────
req = await asyncio.wait_for(reader.readexactly(4), timeout=15)
ver, cmd, _rsv, atyp = req
- if ver != 5 or cmd != 0x01:
- # Only CONNECT (0x01) is supported
+ if ver != 5 or cmd not in (0x01, 0x03):
+ # Support CONNECT (0x01) and UDP ASSOCIATE (0x03) only.
writer.write(b"\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00")
await writer.drain()
return None
@@ -85,4 +85,5 @@ async def negotiate_socks5(
writer.write(b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00")
await writer.drain()
- return host, port
+ cmd_name = "connect" if cmd == 0x01 else "udp_associate"
+ return cmd_name, host, port
diff --git a/src/relay/domain_fronter.py b/src/relay/domain_fronter.py
index 842eeab..d9e264f 100644
--- a/src/relay/domain_fronter.py
+++ b/src/relay/domain_fronter.py
@@ -1562,6 +1562,44 @@ async def relay(self, method: str, url: str,
latency_ns = int((time.perf_counter() - t0) * 1e9)
self._record_site(url, len(result), latency_ns, errored)
+ async def relay_udp_packet(self, host: str, port: int, payload: bytes) -> bytes | None:
+ """Relay a single UDP packet via exit node over Apps Script.
+
+ This encapsulates UDP into the existing HTTPS relay chain:
+ client UDP -> SOCKS5 UDP ASSOCIATE -> Apps Script -> exit node -> UDP target
+ """
+ if not self._exit_node_enabled or not self._exit_node_url:
+ return None
+
+ inner = {
+ "k": self._exit_node_psk,
+ "udp": 1,
+ "host": host,
+ "port": int(port),
+ "payload": base64.b64encode(payload).decode(),
+ }
+ inner_json = json.dumps(inner).encode()
+ outer = self._build_payload(
+ "POST",
+ self._exit_node_url,
+ {"Content-Type": "application/json"},
+ inner_json,
+ )
+ outer["ct"] = "application/json"
+
+ raw = await self._batch_submit(outer)
+ _, _, relay_bytes = split_raw_response(raw)
+ obj = load_relay_json(relay_bytes)
+ if not isinstance(obj, dict) or obj.get("e"):
+ return None
+ b64 = obj.get("payload")
+ if not isinstance(b64, str) or not b64:
+ return b""
+ try:
+ return base64.b64decode(b64)
+ except Exception:
+ return None
+
async def _coalesced_submit(self, key: str, payload: dict) -> bytes:
"""Dedup concurrent requests for the same URL (no Range header).
@@ -2851,4 +2889,3 @@ def _parse_batch_body(self, resp_body: bytes,
for item in items:
results.append(parse_relay_json(item, self._max_response_body_bytes))
return results
-
diff --git a/src/relay/relay_response.py b/src/relay/relay_response.py
index 412f38c..8f5f284 100644
--- a/src/relay/relay_response.py
+++ b/src/relay/relay_response.py
@@ -819,4 +819,14 @@ def parse_relay_response(body: bytes, max_body_bytes: int) -> bytes:
log.warning("JSON parse failed. Response: %s", preview[:200])
return error_response(502, error_msg)
+ # New Code.gs contract can wrap legacy payload in:
+ # {ok:boolean, code:string, message:string, data:object|null}
+ if {"ok", "code", "message", "data"}.issubset(data.keys()):
+ if not data.get("ok"):
+ return error_response(502, f"Relay API error [{data.get('code')}]: {data.get('message')}")
+ wrapped = data.get("data")
+ if not isinstance(wrapped, dict):
+ return error_response(502, "Relay API returned invalid data envelope")
+ data = wrapped
+
return parse_relay_json(data, max_body_bytes)
diff --git a/tests/test_adaptive_transport.py b/tests/test_adaptive_transport.py
new file mode 100644
index 0000000..a607724
--- /dev/null
+++ b/tests/test_adaptive_transport.py
@@ -0,0 +1,46 @@
+import pathlib
+import sys
+import tempfile
+import unittest
+
+ROOT = pathlib.Path(__file__).resolve().parents[1]
+SRC = ROOT / "src"
+if str(SRC) not in sys.path:
+ sys.path.insert(0, str(SRC))
+
+from core.adaptive_transport.engine import AdaptiveRouteEngine
+from core.adaptive_transport.hygiene import validate_public_ip
+from core.adaptive_transport.models import ProbeTarget, RouteScore
+
+
+class HygieneTests(unittest.TestCase):
+ def test_rejects_private(self):
+ with self.assertRaises(ValueError):
+ validate_public_ip("10.0.0.1")
+
+ def test_accepts_public(self):
+ self.assertEqual(validate_public_ip("8.8.8.8"), "8.8.8.8")
+
+
+class EngineTests(unittest.IsolatedAsyncioTestCase):
+ async def test_circuit_breaker(self):
+ with tempfile.TemporaryDirectory() as td:
+ engine = AdaptiveRouteEngine(f"{td}/intel.db")
+ t = ProbeTarget(ip="8.8.8.8", port=443, sni="www.google.com")
+ for _ in range(engine.cfg.circuit_breaker_failures - 1):
+ self.assertFalse(engine.register_route_failure(t))
+ self.assertTrue(engine.register_route_failure(t))
+
+ async def test_select_route_sticky(self):
+ with tempfile.TemporaryDirectory() as td:
+ engine = AdaptiveRouteEngine(f"{td}/intel.db")
+ t = ProbeTarget(ip="8.8.8.8", port=443, sni="www.google.com")
+ r1 = RouteScore(t, 100, 1, 0.0, 1.0, 0.9, 0.8)
+ r2 = RouteScore(t, 90, 1, 0.0, 1.0, 0.9, 0.81)
+ chosen = await engine.select_route([r1], gameplay_active=True)
+ chosen2 = await engine.select_route([r2], gameplay_active=True)
+ self.assertIs(chosen, chosen2)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_config_sync.py b/tests/test_config_sync.py
new file mode 100644
index 0000000..ed6b35a
--- /dev/null
+++ b/tests/test_config_sync.py
@@ -0,0 +1,32 @@
+import json
+import pathlib
+import tempfile
+import unittest
+
+ROOT = pathlib.Path(__file__).resolve().parents[1]
+SRC = ROOT / "src"
+import sys
+if str(SRC) not in sys.path:
+ sys.path.insert(0, str(SRC))
+
+from core.config_sync import ConfigSyncManager
+
+
+class ConfigSyncTests(unittest.TestCase):
+ def test_sync_updates_codegs_and_worker(self):
+ with tempfile.TemporaryDirectory() as td:
+ repo = pathlib.Path(td)
+ (repo / "apps_script").mkdir()
+ (repo / "apps_script" / "Code.gs").write_text('const AUTH_KEY = "OLD";\n', encoding="utf-8")
+ (repo / "apps_script" / "cloudflare_worker.js").write_text('const PSK = "OLD";\n', encoding="utf-8")
+ cfg = {"script_id": "abc", "auth_key": "NEWKEY", "exit_node": {"psk": "NEWPSK"}}
+ sync = ConfigSyncManager(repo)
+ artifacts = sync.sync_all_configs(cfg)
+ self.assertIn('const AUTH_KEY = "NEWKEY";', artifacts["apps_script/Code.gs"])
+ self.assertIn('const PSK = "NEWPSK";', artifacts["apps_script/cloudflare_worker.js"])
+ state = json.loads((repo / ".config_sync_state.json").read_text(encoding="utf-8"))
+ self.assertIn("checksums", state)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_dns_quota.py b/tests/test_dns_quota.py
new file mode 100644
index 0000000..cfd2439
--- /dev/null
+++ b/tests/test_dns_quota.py
@@ -0,0 +1,43 @@
+import pathlib
+import sys
+import time
+import unittest
+
+ROOT = pathlib.Path(__file__).resolve().parents[1]
+SRC = ROOT / "src"
+if str(SRC) not in sys.path:
+ sys.path.insert(0, str(SRC))
+
+from core.dns_cache import DnsCache
+from core.quota_monitor import QuotaMonitor
+
+
+class DnsQuotaTests(unittest.TestCase):
+ def test_dns_cache_expiry(self):
+ cache = DnsCache(ttl_s=1.0, max_items=64)
+ cache.set("a.test", [(2, "1.1.1.1")])
+ self.assertIsNotNone(cache.get("a.test"))
+ cache._cache["a.test"].expires_at = time.time() - 1
+ self.assertIsNone(cache.get("a.test"))
+
+ def test_quota_monitor_states(self):
+ qm = QuotaMonitor()
+ qm.record("sid1", ok=True)
+ self.assertEqual(qm.snapshot()["sid1"], "GOOD")
+ for _ in range(5):
+ qm.record("sid2", ok=False, error="quota exceeded")
+ self.assertEqual(qm.snapshot()["sid2"], "COOLDOWN")
+ self.assertEqual(qm.predict_exhaustion_risk("sid2"), "HIGH")
+
+ def test_quota_success_rate_prediction(self):
+ qm = QuotaMonitor()
+ for _ in range(8):
+ qm.record("sid3", ok=True)
+ for _ in range(2):
+ qm.record("sid3", ok=False, error="transient")
+ self.assertGreaterEqual(qm.success_rate("sid3"), 0.75)
+ self.assertEqual(qm.predict_exhaustion_risk("sid3"), "LOW")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_google_ip_scanner_probe.py b/tests/test_google_ip_scanner_probe.py
new file mode 100644
index 0000000..9f7efde
--- /dev/null
+++ b/tests/test_google_ip_scanner_probe.py
@@ -0,0 +1,50 @@
+import asyncio
+import pathlib
+import sys
+import unittest
+from unittest.mock import AsyncMock, Mock, patch
+
+ROOT = pathlib.Path(__file__).resolve().parents[1]
+SRC = ROOT / "src"
+if str(SRC) not in sys.path:
+ sys.path.insert(0, str(SRC))
+
+from core.google_ip_scanner import _classify_fronted_failure, probe_via_fronted_h2
+
+
+class ScannerProbeTests(unittest.IsolatedAsyncioTestCase):
+ def test_html_detection(self):
+ err = _classify_fronted_failure(b"HTTP/1.1 200 OK\r\n\r\nlogin", "www.google.com")
+ self.assertEqual(err, "html_login_page")
+
+ def test_invalid_json_detection(self):
+ payload = b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{bad-json"
+ err = _classify_fronted_failure(payload, "www.google.com")
+ self.assertEqual(err, "invalid_json_response")
+
+ async def test_timeout_retry_behavior(self):
+ with patch("core.google_ip_scanner.asyncio.open_connection", new=AsyncMock(side_effect=asyncio.TimeoutError)):
+ with patch("core.google_ip_scanner.asyncio.sleep", new=AsyncMock()) as sleep_mock:
+ result = await probe_via_fronted_h2("8.8.8.8", "www.google.com", timeout=0.01, retries=3)
+ self.assertFalse(result.ok)
+ self.assertEqual(result.error, "timeout")
+ self.assertEqual(sleep_mock.await_count, 2)
+
+ async def test_restricted_network_html_classification(self):
+ fake_reader = AsyncMock()
+ fake_reader.read = AsyncMock(return_value=b"HTTP/1.1 200 OK\r\n\r\nSign In")
+ fake_writer = Mock()
+ fake_writer.write = Mock()
+ fake_writer.close = Mock()
+ fake_writer.drain = AsyncMock(return_value=None)
+ fake_writer.wait_closed = AsyncMock(return_value=None)
+ fake_ssl = type("SSL", (), {"selected_alpn_protocol": lambda self: "http/1.1"})()
+ fake_writer.get_extra_info = lambda key: fake_ssl if key == "ssl_object" else None
+ with patch("core.google_ip_scanner.asyncio.open_connection", new=AsyncMock(return_value=(fake_reader, fake_writer))):
+ result = await probe_via_fronted_h2("1.1.1.1", "www.google.com", timeout=0.1, retries=1)
+ self.assertFalse(result.ok)
+ self.assertEqual(result.error, "html_login_page")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_logging_utils.py b/tests/test_logging_utils.py
new file mode 100644
index 0000000..5ef05b9
--- /dev/null
+++ b/tests/test_logging_utils.py
@@ -0,0 +1,30 @@
+import logging
+import pathlib
+import sys
+import unittest
+
+ROOT = pathlib.Path(__file__).resolve().parents[1]
+SRC = ROOT / "src"
+if str(SRC) not in sys.path:
+ sys.path.insert(0, str(SRC))
+
+from core.logging_utils import PrettyFormatter
+
+
+class LoggingUtilsTests(unittest.TestCase):
+ def test_metadata_suffix_rendered(self):
+ fmt = PrettyFormatter(use_color=False)
+ rec = logging.LogRecord(
+ name="Proxy", level=logging.INFO, pathname=__file__, lineno=1, msg="relay event", args=(), exc_info=None
+ )
+ rec.request_type = "POST"
+ rec.status_code = 200
+ rec.success = True
+ out = fmt.format(rec)
+ self.assertIn("request_type=POST", out)
+ self.assertIn("status_code=200", out)
+ self.assertIn("success=True", out)
+
+
+if __name__ == "__main__":
+ unittest.main()