diff --git a/apps_script/Code.gs b/apps_script/Code.gs index 970e98f..67f2a15 100644 --- a/apps_script/Code.gs +++ b/apps_script/Code.gs @@ -43,7 +43,7 @@ const SAFE_REPLAY_METHODS = { GET: 1, HEAD: 1, OPTIONS: 1 }; function doPost(e) { try { var req = JSON.parse(e.postData.contents); - if (req.k !== AUTH_KEY) return _json({ e: "unauthorized" }); + if (req.k !== AUTH_KEY) return _api(false, "unauthorized", "auth key mismatch", null); // Batch mode: { k, q: [...] } if (Array.isArray(req.q)) return _doBatch(req.q); @@ -51,7 +51,7 @@ function doPost(e) { // Single mode return _doSingle(req); } catch (err) { - return _json({ e: String(err) }); + return _api(false, "bad_request", String(err), null); } } @@ -72,14 +72,14 @@ function _maybeGzip(bytes) { function _doSingle(req) { if (!req.u || typeof req.u !== "string" || !req.u.match(/^https?:\/\//i)) { - return _json({ e: "bad url" }); + return _api(false, "bad_url", "url must be http(s)", null); } // Loop guard: refuse to relay back to any Apps Script deployment. // This fires when an exit node URL is misconfigured to point at a GAS // script — without this check the script would call itself indefinitely // and burn through the daily UrlFetch quota in seconds. if (_GAS_URL_RE.test(req.u)) { - return _json({ e: "loop detected: relay target cannot be a Google Apps Script URL" }); + return _api(false, "loop_detected", "relay target cannot be a Google Apps Script URL", null); } var opts = _buildOpts(req); var resp = UrlFetchApp.fetch(req.u, opts); @@ -90,7 +90,7 @@ function _doSingle(req) { b: Utilities.base64Encode(gz.b), }; if (gz.gz) result.gz = 1; - return _json(result); + return _api(true, "ok", "relay_success", result); } function _doBatch(items) { @@ -178,7 +178,7 @@ function _doBatch(items) { } } } - return _json({ q: results }); + return _api(true, "ok", "batch_relay_success", { q: results }); } function _buildOpts(req) { @@ -221,19 +221,21 @@ function _respHeaders(resp) { } function doGet(e) { - return HtmlService.createHtmlOutput( - "My App" + - '' + - "

Welcome

This application is running normally.

" + - "" - ); + return _api(true, "healthy", "deployment is reachable", { + now_ms: Date.now(), + quota_note: "apps script quotas are managed by Google account limits" + }); } -function _json(obj) { - // HtmlService responses can stay on script.google.com for /dev, while - // ContentService commonly bounces through script.googleusercontent.com. - // The Python client extracts the JSON payload from the body either way. - return HtmlService.createHtmlOutput(JSON.stringify(obj)).setXFrameOptionsMode( - HtmlService.XFrameOptionsMode.ALLOWALL - ); +function _api(ok, code, message, data) { + return ContentService + .createTextOutput( + JSON.stringify({ + ok: !!ok, + code: String(code || (ok ? "ok" : "error")), + message: String(message || ""), + data: data === undefined ? null : data + }) + ) + .setMimeType(ContentService.MimeType.JSON); } diff --git a/apps_script/vps_exit_node.py b/apps_script/vps_exit_node.py index 027bbab..7b48fe7 100644 --- a/apps_script/vps_exit_node.py +++ b/apps_script/vps_exit_node.py @@ -34,6 +34,7 @@ import logging import os import re +import socket import socketserver import sys import urllib.error @@ -209,6 +210,23 @@ def _relay_request( } + + +def _relay_udp_packet(host: str, port: int, payload: bytes) -> dict: + """Send one UDP packet and return one response packet (best effort).""" + if not host or port <= 0 or port > 65535: + return {"e": "bad_udp_target"} + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + sock.settimeout(2.0) + sock.sendto(payload, (host, port)) + data, _ = sock.recvfrom(65535) + return {"ok": True, "payload": base64.b64encode(data).decode()} + except Exception as exc: + return {"e": str(exc) or type(exc).__name__} + finally: + sock.close() + # --------------------------------------------------------------------------- # HTTP request handler # --------------------------------------------------------------------------- @@ -265,6 +283,7 @@ def do_POST(self): # noqa: N802 m = str(body.get("m") or "GET").upper() h = _sanitize_headers(body.get("h")) b64 = body.get("b") + udp_mode = bool(body.get("udp")) if not _PSK: self._send_json(500, {"e": "server_psk_missing"}) @@ -275,6 +294,25 @@ def do_POST(self): # noqa: N802 self._send_json(401, {"e": "unauthorized"}) return + + if udp_mode: + host = str(body.get("host") or "") + try: + port = int(body.get("port") or 0) + except Exception: + port = 0 + payload = b"" + pb64 = body.get("payload") + if isinstance(pb64, str) and pb64: + try: + payload = base64.b64decode(pb64) + except Exception: + self._send_json(400, {"e": "bad_udp_payload"}) + return + result = _relay_udp_packet(host, port, payload) + self._send_json(200, result) + return + if not _safe_url(u): self._send_json(400, {"e": "bad_url"}) return diff --git a/docs/GETTING_STARTED.md b/docs/GETTING_STARTED.md index 4a26f85..309620e 100644 --- a/docs/GETTING_STARTED.md +++ b/docs/GETTING_STARTED.md @@ -1,156 +1,218 @@ -# Getting Started +# Getting Started (Complete Guide) -This guide keeps only the setup path most users need. Follow it once, then come back to the root README only when you need another topic. +This is the **full, practical setup guide** for first-time users. +If you follow this page in order, you can go from zero to a working proxy. -## What You Need +--- -- Python 3.10 or newer -- A free Google account -- Git, or the ZIP download from GitHub -- A browser where you can set an HTTP proxy +## Quick Outcome -## 1. Get The Project +When done correctly: +- Local HTTP proxy works at `127.0.0.1:8085` +- Local SOCKS5 proxy works at `127.0.0.1:1080` +- HTTPS websites open without certificate warnings +- You can run route checks with `--scan` and stability-first checks with `--adaptive-scan` -Choose whichever option works on your network. +--- -**Option A: ZIP** +## Prerequisites -[Click to Download](https://github.com/masterking32/MasterHttpRelayVPN/archive/refs/heads/python_testing.zip) +- Python `3.10+` +- A Google account (for Apps Script relay) +- Git (optional, ZIP download also works) +- A browser where you can set manual proxy +--- -**Option B: Git** +## 1) Download The Project + +### Option A — ZIP + +Download and extract: + +- + +Then open terminal in extracted folder. + +### Option B — Git ```bash git clone https://github.com/masterking32/MasterHttpRelayVPN.git cd MasterHttpRelayVPN ``` -## 2. Deploy The Google Relay +--- -The relay is the small Apps Script program that fetches websites for you. +## 2) Deploy The Google Apps Script Relay -1. Open [Google Apps Script](https://script.google.com/) and sign in. +1. Open and sign in. 2. Click **New project**. -3. Delete the default editor content. -4. Open [apps_script/Code.gs](../apps_script/Code.gs), copy everything, and paste it into Apps Script. -5. Change this line to a long secret: +3. Delete default code. +4. Open local file [`apps_script/Code.gs`](../apps_script/Code.gs), copy all code, paste into Apps Script. +5. Change: ```javascript const AUTH_KEY = "your-secret-password-here"; ``` -6. Click **Deploy** -> **New deployment**. + to your own long random secret. + +6. Click **Deploy** → **New deployment**. 7. Select **Web app**. -8. Set **Execute as** to **Me**. -9. Set **Who has access** to **Anyone**. -10. Click **Deploy**, authorize the app, and copy the **Deployment ID**. +8. Set **Execute as** = **Me**. +9. Set **Who has access** = **Anyone**. +10. Deploy, authorize, copy **Deployment ID**. -Keep the `AUTH_KEY` and Deployment ID nearby. You need both locally. +You now need **both** values locally: +- `Deployment ID` +- `AUTH_KEY` -## 3. Run The One-Click Launcher +--- -**Windows** +## 3) Start The App (Recommended) + +### Windows ```cmd start.bat ``` -**Linux / macOS** +### Linux / macOS ```bash chmod +x start.sh ./start.sh ``` -The launcher creates `.venv`, installs dependencies, runs `setup.py` if `config.json` is missing, and starts the proxy. - -If dependency installation fails through PyPI, the launcher retries through the runflare mirror automatically. +What launcher does: +- creates `.venv` +- installs dependencies +- runs setup wizard if `config.json` is missing +- starts proxy -## 4. Answer The Setup Wizard +--- -When the wizard opens: +## 4) Fill Setup Wizard Correctly -1. Enter the same `auth_key` you placed inside [apps_script/Code.gs](../apps_script/Code.gs). -2. Paste the Apps Script Deployment ID. -3. Keep the default HTTP proxy port `8085` unless you already use that port. -4. Keep LAN sharing off unless other devices must use this proxy. +When prompted: +1. `auth_key` = exactly same as `AUTH_KEY` in Apps Script +2. `script_id` = your Deployment ID +3. Keep HTTP port `8085` unless busy +4. Keep LAN sharing disabled unless you need other devices -The wizard writes `config.json` for you. +The wizard creates `config.json`. -## 5. Configure Your Browser +--- -Use the HTTP proxy for normal browsing: +## 5) Configure Browser Proxy | Field | Value | -|-------|-------| +|---|---| | Proxy type | HTTP | | Address | `127.0.0.1` | | Port | `8085` | -Firefox path: **Settings** -> **General** -> **Network Settings** -> **Manual proxy**. Enter `127.0.0.1` and `8085`, then enable the option to also use it for HTTPS. +For Firefox: Settings → General → Network Settings → Manual proxy. +Enable proxy for HTTPS too. -Chrome and Edge use the system proxy on Windows. You can also use extensions such as FoxyProxy or SwitchyOmega for easier switching. +--- -## 6. Install The CA Certificate +## 6) Install Local CA (HTTPS Required) -HTTPS browsing needs the local CA certificate generated by the proxy. The file is created at `ca/ca.crt` after first run. +The proxy generates `ca/ca.crt`. +If auto-install fails, install manually. -**The app tries to install it automatically. If it cannot, install it manually:** +### Windows +1. Open `ca/ca.crt` +2. Install Certificate +3. Current User +4. Place in **Trusted Root Certification Authorities** +5. Restart browser fully -**Windows** +### macOS +1. Open `ca/ca.crt` in Keychain Access +2. Open certificate → Trust +3. Set **Always Trust** +4. Restart browser -1. Double-click `ca/ca.crt`. -2. Choose **Install Certificate**. -3. Choose **Current User**. -4. Choose **Place all certificates in the following store**. -5. Select **Trusted Root Certification Authorities**. -6. Finish, then fully restart your browser. - -**macOS** - -1. Open `ca/ca.crt` in Keychain Access. -2. Find the certificate and open it. -3. Expand **Trust**. -4. Set **When using this certificate** to **Always Trust**. -5. Close the dialog, enter your password, and restart your browser. - -**Linux Ubuntu / Debian** +### Ubuntu / Debian ```bash sudo cp ca/ca.crt /usr/local/share/ca-certificates/masterhttp-relay.crt sudo update-ca-certificates ``` -Restart your browser after installing. +Restart browser. + +### Firefox (if needed) +Firefox can use a separate trust store: +- Settings → Privacy & Security → Certificates → View Certificates → Authorities → Import `ca/ca.crt` +- Enable trust for website identification + +--- + +## 7) Verify It Works + +- Open normal websites through browser proxy. +- If `unauthorized` appears: `AUTH_KEY` mismatch between Apps Script and `config.json`. +- If HTTPS certificate errors appear: CA not trusted correctly. -**Firefox** +--- -Firefox may use a separate certificate store: +## 8) Route Quality Commands -1. Open **Settings** -> **Privacy & Security** -> **Certificates** -> **View Certificates**. -2. Go to **Authorities**. -3. Click **Import** and select `ca/ca.crt`. -4. Enable **Trust this CA to identify websites**. +### Fast reachability scan -## Manual Run Commands +```bash +python main.py --scan +``` -Use these only if you are not using the launcher: +Use suggested `google_ip` in `config.json`. + +### Stability-first adaptive scan (recommended for unstable networks) ```bash +python main.py --adaptive-scan +``` + +This ranking is based on route stability metrics (not only minimum ping). + +--- + +## 9) Manual Start (Without launcher) + +### Windows + +```cmd python -m venv .venv .venv\Scripts\python -m pip install -r requirements.txt .venv\Scripts\python setup.py .venv\Scripts\python main.py ``` -On Linux / macOS, replace `.venv\Scripts\python` with `.venv/bin/python`. +### Linux / macOS + +```bash +python3 -m venv .venv +.venv/bin/python -m pip install -r requirements.txt +.venv/bin/python setup.py +.venv/bin/python main.py +``` + +--- + +## 10) Common Problems (Short) -## Done +- `unauthorized`: auth key mismatch +- proxy connects but sites fail: wrong Deployment ID or script deployment not public +- HTTPS warnings: CA not installed/trusted +- some services block Google egress: use Exit Node guide -When everything is working, the terminal shows the HTTP proxy on `127.0.0.1:8085` and SOCKS5 on `127.0.0.1:1080`. +--- -Next useful pages: +## Next Docs - [Troubleshooting](TROUBLESHOOTING.md) -- [Configuration Reference](CONFIGURATION.md) -- [Exit Node Guide](exit-node/EXIT_NODE_DEPLOYMENT.md) +- [Configuration](CONFIGURATION.md) +- [Exit Node](exit-node/EXIT_NODE_DEPLOYMENT.md) +- [Architecture](ARCHITECTURE.md) diff --git a/docs/fa/GETTING_STARTED.md b/docs/fa/GETTING_STARTED.md index b0102c6..c10d2b3 100644 --- a/docs/fa/GETTING_STARTED.md +++ b/docs/fa/GETTING_STARTED.md @@ -1,70 +1,217 @@ -# شروع سریع +# شروع سریع (راهنمای کامل) -این راهنما مسیر ساده راه‌اندازی را نشان می‌دهد: یک رله Google Apps Script، یک فایل `config.json`، و پراکسی محلی روی سیستم شما. +این صفحه یک **راهنمای کامل و ساده** برای شروع است. +اگر مرحله‌به‌مرحله جلو بروید، در پایان پراکسی شما کاملا کار خواهد کرد. -## 1. دریافت پروژه +--- -**با Git:** +## نتیجه نهایی + +اگر همه چیز درست انجام شود: +- پراکسی HTTP روی `127.0.0.1:8085` فعال می‌شود +- پراکسی SOCKS5 روی `127.0.0.1:1080` فعال می‌شود +- سایت‌های HTTPS بدون خطای گواهی باز می‌شوند +- می‌توانید با `--scan` و `--adaptive-scan` کیفیت مسیر را بررسی کنید + +--- + +## پیش‌نیازها + +- Python نسخه `3.10+` +- یک اکانت Google (برای Apps Script) +- Git (اختیاری؛ دانلود ZIP هم کافی است) +- مرورگری که تنظیم پراکسی دستی داشته باشد + +--- + +## 1) دریافت پروژه + +### روش A — ZIP + +دانلود و extract: +- + +بعد داخل پوشه پروژه terminal باز کنید. + +### روش B — Git ```bash git clone https://github.com/masterking32/MasterHttpRelayVPN.git cd MasterHttpRelayVPN ``` -**با ZIP:** +--- + +## 2) ساخت رله Google Apps Script -- صفحه [GitHub پروژه](https://github.com/masterking32/MasterHttpRelayVPN) را باز کنید. -- روی **Code** -> **Download ZIP** کلیک کنید. -- فایل ZIP را extract کنید. -- داخل پوشه `MasterHttpRelayVPN` یک terminal باز کنید. +1. وارد شوید. +2. روی **New project** کلیک کنید. +3. کد پیش‌فرض را پاک کنید. +4. فایل [`apps_script/Code.gs`](../../apps_script/Code.gs) را باز کنید، کل محتوا را کپی و در Apps Script paste کنید. +5. این مقدار را تغییر دهید: -## 2. ساخت رله Google + ```javascript + const AUTH_KEY = "your-secret-password-here"; + ``` -- به [Google Apps Script](https://script.google.com/) بروید و یک پروژه جدید بسازید. -- محتوای [apps_script/Code.gs](../../apps_script/Code.gs) را داخل فایل `Code.gs` کپی کنید. -- مقدار `AUTH_KEY` را به یک رمز طولانی و تصادفی تغییر دهید. -- از مسیر **Deploy** -> **New deployment** نوع **Web app** را انتخاب کنید. -- گزینه **Execute as** را روی **Me** و گزینه دسترسی را روی **Anyone** بگذارید. -- Deploy کنید و `Deployment ID` را نگه دارید. + و یک رمز طولانی و تصادفی خودتان بگذارید. -بعد از هر تغییر در `Code.gs` باید deployment جدید بسازید. +6. مسیر **Deploy** → **New deployment** را بزنید. +7. نوع **Web app** را انتخاب کنید. +8. **Execute as** را روی **Me** بگذارید. +9. **Who has access** را روی **Anyone** بگذارید. +10. Deploy کنید، دسترسی را تایید کنید، و **Deployment ID** را کپی کنید. -## 3. اجرای لانچر +دو مقدار مهم برای سیستم محلی: +- `Deployment ID` +- `AUTH_KEY` -**Windows:** +--- + +## 3) اجرای برنامه (روش پیشنهادی) + +### Windows ```cmd start.bat ``` -**Linux / macOS:** +### Linux / macOS ```bash chmod +x start.sh ./start.sh ``` -لانچر محیط مجازی می‌سازد، وابستگی‌ها را نصب می‌کند، اگر `config.json` وجود نداشته باشد setup wizard را اجرا می‌کند، و سپس پراکسی را بالا می‌آورد. +لانچر کارهای زیر را انجام می‌دهد: +- ساخت `.venv` +- نصب وابستگی‌ها +- اجرای setup wizard اگر `config.json` موجود نباشد +- اجرای پراکسی + +--- -## 4. تنظیم مرورگر +## 4) تکمیل Setup Wizard -مرورگر را روی پراکسی زیر تنظیم کنید: +در wizard: +1. `auth_key` دقیقا برابر `AUTH_KEY` در Apps Script باشد +2. `script_id` همان Deployment ID شما باشد +3. پورت HTTP را `8085` بگذارید (مگر اینکه اشغال باشد) +4. LAN sharing را فقط وقتی لازم دارید روشن کنید + +در پایان فایل `config.json` ساخته می‌شود. + +--- + +## 5) تنظیم پراکسی در مرورگر | گزینه | مقدار | -|-------|-------| +|---|---| | نوع پراکسی | HTTP | | آدرس | `127.0.0.1` | | پورت | `8085` | -| SOCKS5، اختیاری | `127.0.0.1:1080` | -برای HTTPS اگر مرورگر خطای گواهی داد، فایل `ca/ca.crt` را به عنوان trusted root نصب کنید و مرورگر را کامل ببندید و دوباره باز کنید. +در Firefox: Settings → General → Network Settings → Manual proxy +و برای HTTPS هم فعال کنید. + +--- + +## 6) نصب CA محلی (برای HTTPS ضروری) + +فایل گواهی در `ca/ca.crt` ساخته می‌شود. +اگر نصب خودکار انجام نشد، دستی نصب کنید. + +### Windows +1. فایل `ca/ca.crt` را باز کنید +2. Install Certificate +3. Current User +4. ذخیره در **Trusted Root Certification Authorities** +5. مرورگر را کامل ببندید و دوباره باز کنید + +### macOS +1. `ca/ca.crt` را در Keychain Access باز کنید +2. بخش Trust را باز کنید +3. روی **Always Trust** بگذارید +4. مرورگر را ری‌استارت کنید + +### Ubuntu / Debian + +```bash +sudo cp ca/ca.crt /usr/local/share/ca-certificates/masterhttp-relay.crt +sudo update-ca-certificates +``` + +مرورگر را ری‌استارت کنید. + +### Firefox (در صورت نیاز) +ممکن است Firefox store جداگانه داشته باشد: +- Settings → Privacy & Security → Certificates → View Certificates → Authorities → Import `ca/ca.crt` +- گزینه اعتماد برای شناسایی وب‌سایت را فعال کنید + +--- + +## 7) تست عملکرد + +- چند سایت عادی باز کنید. +- اگر `unauthorized` دیدید: `AUTH_KEY` ناهماهنگ است. +- اگر خطای گواهی HTTPS دیدید: CA درست نصب نشده. + +--- + +## 8) دستورهای بررسی کیفیت مسیر + +### اسکن سریع دسترسی IP + +```bash +python main.py --scan +``` + +IP پیشنهادی را در `config.json` قرار دهید. + +### اسکن تطبیقیِ پایداری‌محور (پیشنهادی برای شبکه ناپایدار) + +```bash +python main.py --adaptive-scan +``` + +این اسکن فقط روی کمترین پینگ تصمیم نمی‌گیرد و پایداری را هم در نظر می‌گیرد. + +--- + +## 9) اجرای دستی (بدون لانچر) + +### Windows + +```cmd +python -m venv .venv +.venv\Scripts\python -m pip install -r requirements.txt +.venv\Scripts\python setup.py +.venv\Scripts\python main.py +``` + +### Linux / macOS + +```bash +python3 -m venv .venv +.venv/bin/python -m pip install -r requirements.txt +.venv/bin/python setup.py +.venv/bin/python main.py +``` + +--- + +## 10) مشکلات رایج (خلاصه) -## 5. بررسی سریع +- `unauthorized`: عدم تطابق auth key +- اتصال پراکسی برقرار است ولی سایت‌ها باز نمی‌شوند: Deployment ID اشتباه یا دسترسی Web app درست نیست +- خطای HTTPS: گواهی CA نصب/Trusted نشده +- برخی سرویس‌ها خروجی Google را می‌بندند: از Exit Node استفاده کنید -- اگر `unauthorized` دیدید، مقدار `AUTH_KEY` در Apps Script باید دقیقا با `auth_key` در `config.json` یکی باشد. -- اگر صفحه‌ها باز نمی‌شوند، [رفع مشکل](TROUBLESHOOTING.md) را ببینید. -- اگر سرعت پایین است، دستور `python main.py --scan` را اجرا کنید و IP پیشنهادی را در `config.json` بگذارید. +--- -## قدم بعدی +## صفحه‌های بعدی -برای همه گزینه‌های تنظیمات، [مرجع تنظیمات](CONFIGURATION.md) را بخوانید. برای مسیرهای خاص مثل ChatGPT یا Turnstile، [راهنمای Exit Node](../exit-node/EXIT_NODE_DEPLOYMENT_FA.md) را ببینید. +- [رفع مشکل](TROUBLESHOOTING.md) +- [مرجع تنظیمات](CONFIGURATION.md) +- [راهنمای Exit Node](../exit-node/EXIT_NODE_DEPLOYMENT_FA.md) +- [معماری](ARCHITECTURE.md) diff --git a/main.py b/main.py index 76e7370..268c16f 100644 --- a/main.py +++ b/main.py @@ -24,6 +24,7 @@ from core.constants import __version__ from core.lan_utils import log_lan_access from core.google_ip_scanner import scan_sync +from core.adaptive_transport import AdaptiveRouteEngine, ProbeTarget from core.logging_utils import configure as configure_logging, print_banner from proxy.mitm import CA_CERT_FILE from proxy.proxy_server import ProxyServer @@ -99,6 +100,11 @@ def parse_args(): action="store_true", help="Scan Google IPs to find the fastest reachable one and exit.", ) + parser.add_argument( + "--adaptive-scan", + action="store_true", + help="Run adaptive transport scan (jitter/loss/stability-first) and exit.", + ) return parser.parse_args() @@ -224,6 +230,24 @@ def main(): print("Deploy the Apps Script from Code.gs and paste the Deployment ID.") sys.exit(1) + + if args.adaptive_scan: + configure_logging("INFO") + from core.constants import CANDIDATE_IPS + engine = AdaptiveRouteEngine(config.get("route_db", "route_intel.sqlite3")) + targets = [ + ProbeTarget(ip=ip, port=443, sni=config.get("front_domain", "www.google.com"), transport_profile="vless_reality") + for ip in CANDIDATE_IPS + ] + ranked = asyncio.run(engine.evaluate(targets)) + if not ranked: + print("No viable adaptive routes found") + sys.exit(1) + print("Adaptive route ranking (stability-first):") + for i, route in enumerate(ranked[:5], 1): + print(f"{i}. {route.target.ip} score={route.score:.4f} median={route.median_rtt_ms:.1f}ms jitter={route.jitter_ms:.1f} loss={route.packet_loss:.2%} handshake={route.handshake_success_rate:.2%}") + sys.exit(0) + # ── Google IP Scanner ────────────────────────────────────────────────── if args.scan: configure_logging("INFO") diff --git a/setup.py b/setup.py index 0bb2f48..e82a75c 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,8 @@ import sys from pathlib import Path +from src.core.config_sync import ConfigSyncManager, ConfigSyncError + HERE = Path(__file__).resolve().parent CONFIG_PATH = HERE / "config.json" EXAMPLE_PATH = HERE / "config.example.json" @@ -159,38 +161,80 @@ def write_config(cfg: dict) -> None: f.write("\n") -def main() -> int: - print() - print(bold("MasterHttpRelayVPN - setup wizard")) - print(dim("Answer a few questions and we'll write config.json for you.")) +def _sync_runtime(cfg: dict) -> None: + sync = ConfigSyncManager(HERE) + artifacts = sync.sync_all_configs(cfg) + (HERE / "apps_script" / "Code.gs").write_text(artifacts["apps_script/Code.gs"], encoding="utf-8") + (HERE / "apps_script" / "cloudflare_worker.js").write_text(artifacts["apps_script/cloudflare_worker.js"], encoding="utf-8") + CONFIG_PATH.write_text(artifacts["config.json"], encoding="utf-8") + +def _load_existing_or_base() -> dict: if CONFIG_PATH.exists(): - if not prompt_yes_no("config.json already exists. Overwrite?", default=False): - print(dim("Nothing changed.")) - return 0 + with CONFIG_PATH.open(encoding="utf-8") as f: + return json.load(f) + return load_base_config() - cfg = load_base_config() - suggested_key = random_auth_key() +def _edit_existing(cfg: dict) -> dict: print() - print(bold("Shared password (auth_key)")) - print(dim(" Must match AUTH_KEY inside apps_script/Code.gs.")) - cfg["auth_key"] = prompt("auth_key", default=suggested_key) - + print(bold("Edit existing config")) + cfg["auth_key"] = prompt("auth_key", default=str(cfg.get("auth_key", random_auth_key()))) cfg = configure_apps_script(cfg) cfg = configure_network(cfg) + return cfg - write_config(cfg) +def main_menu() -> str: print() - print(green(f"[OK] wrote {CONFIG_PATH.name}")) - print() - print(bold("Next step:")) - print(f" python main.py") + print(bold("Main Menu")) + print(" 1) Create new config") + print(" 2) Edit existing config") + print(" 3) Validate deployments") + print(" 4) Sync configs to all files") + print(" 5) View quota status") + print(" 6) Advanced settings") + print(" 7) Exit") + return prompt("Select option", default="7") + + +def main() -> int: print() - print(yellow("Reminder: the AUTH_KEY inside apps_script/Code.gs must match the auth_key")) - print(yellow("you just entered - otherwise the relay will return 'unauthorized'.")) - return 0 + print(bold("MasterHttpRelayVPN - setup wizard")) + print(dim("Answer a few questions and we'll write config.json for you.")) + + cfg = _load_existing_or_base() + while True: + choice = main_menu() + if choice == "1": + cfg = load_base_config() + cfg["auth_key"] = prompt("auth_key", default=random_auth_key()) + cfg = configure_apps_script(cfg) + cfg = configure_network(cfg) + write_config(cfg) + _sync_runtime(cfg) + print(green("[OK] created and synchronized config/runtime artifacts")) + elif choice == "2": + cfg = _edit_existing(cfg) + write_config(cfg) + _sync_runtime(cfg) + print(green("[OK] edited and synchronized config/runtime artifacts")) + elif choice == "3": + valid = bool(cfg.get("script_id") or cfg.get("script_ids")) + print(green("[OK] deployment configuration looks present") if valid else red("[FAIL] missing script_id/script_ids")) + elif choice == "4": + try: + _sync_runtime(cfg) + print(green("[OK] sync complete")) + except ConfigSyncError as exc: + print(red(f"[FAIL] sync failed: {exc}")) + elif choice == "5": + print(yellow("[WARN] quota dashboard not yet implemented in setup.py")) + elif choice == "6": + print(dim("Advanced settings are edited through option #2 in this release.")) + elif choice == "7": + print(dim("Done.")) + return 0 if __name__ == "__main__": diff --git a/src/core/adaptive_transport/__init__.py b/src/core/adaptive_transport/__init__.py new file mode 100644 index 0000000..2e7b603 --- /dev/null +++ b/src/core/adaptive_transport/__init__.py @@ -0,0 +1,11 @@ +from .engine import AdaptiveRouteEngine, AdaptiveRouteConfig +from .models import ProbeTarget, RouteScore +from .storage import RouteIntelligenceStore + +__all__ = [ + "AdaptiveRouteEngine", + "AdaptiveRouteConfig", + "ProbeTarget", + "RouteScore", + "RouteIntelligenceStore", +] diff --git a/src/core/adaptive_transport/engine.py b/src/core/adaptive_transport/engine.py new file mode 100644 index 0000000..78258f8 --- /dev/null +++ b/src/core/adaptive_transport/engine.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +import asyncio +import logging +import time +from dataclasses import dataclass + +from .hygiene import validate_public_ip +from .models import ProbeTarget, RouteScore, RuntimeMetrics, SessionState +from .probe import AsyncRouteProbe, summarize +from .storage import RouteIntelligenceStore + +logger = logging.getLogger(__name__) + + +@dataclass(slots=True) +class AdaptiveRouteConfig: + min_concurrency: int = 1 + max_concurrency: int = 16 + sticky_session_s: float = 180.0 + switch_guard_s: float = 30.0 + circuit_breaker_failures: int = 4 + + +class AdaptiveRouteEngine: + def __init__(self, db_path: str, cfg: AdaptiveRouteConfig | None = None): + self.cfg = cfg or AdaptiveRouteConfig() + self.store = RouteIntelligenceStore(db_path) + self.probe = AsyncRouteProbe() + self._active_route: RouteScore | None = None + self._active_until = 0.0 + self._failure_counts: dict[str, int] = {} + self._session = SessionState() + self._scan_tasks: set[asyncio.Task] = set() + self._stopped = False + + async def evaluate(self, targets: list[ProbeTarget], cancel_event: asyncio.Event | None = None) -> list[RouteScore]: + if self._stopped: + return [] + ordered_targets = sorted(targets, key=self._route_key) + concurrency = max(1, min(self.cfg.max_concurrency, max(self.cfg.min_concurrency, len(ordered_targets)))) + sem = asyncio.Semaphore(concurrency) + results: list[RouteScore] = [] + + async def worker(t: ProbeTarget): + validate_public_ip(t.ip) + if self._stopped or (cancel_event and cancel_event.is_set()): + return + async with sem: + if self._stopped or (cancel_event and cancel_event.is_set()): + return + samples = await self.probe.probe(t, include_quic=t.transport_profile == "quic") + med, jit, loss, hs, stable = summarize(samples) + score_value = (stable * 0.45) + ((1.0 - min(1.0, loss)) * 0.25) + (hs * 0.20) + (1.0 / (1.0 + jit + med / 100.0) * 0.10) + score = RouteScore(t, med, jit, loss, hs, stable, score_value) + results.append(score) + logger.info( + "route_score_breakdown", + extra={ + "route": self._route_key(t), + "median_rtt_ms": med, + "jitter_ms": jit, + "packet_loss": loss, + "handshake_success_rate": hs, + "session_stability": stable, + "score": score_value, + }, + ) + await self.store.record_score(score) + + self._scan_tasks = {asyncio.create_task(worker(t)) for t in ordered_targets} + try: + await asyncio.gather(*self._scan_tasks, return_exceptions=False) + finally: + self._scan_tasks.clear() + return sorted(results, key=lambda r: (r.score, -r.packet_loss, -r.jitter_ms, -r.median_rtt_ms, self._route_key(r.target)), reverse=True) + + async def select_route(self, candidates: list[RouteScore], gameplay_active: bool) -> RouteScore | None: + if self._stopped: + return None + now = time.time() + if not candidates: + logger.info("route_rejected", extra={"reason": "no_candidates", "selected": self._route_key(self._active_route.target) if self._active_route else None}) + return self._active_route + best = candidates[0] + if self._active_route and gameplay_active and not self._hard_failure(self._active_route.target): + self._session.state = "session_active" + logger.info("session_transition", extra={"transition": "session_active", "route": self._route_key(self._active_route.target)}) + logger.info("route_rejected", extra={"reason": "session_active_locked", "selected": self._route_key(self._active_route.target), "candidate": self._route_key(best.target)}) + return self._active_route + if ( + self._active_route + and not self._hard_failure(self._active_route.target) + and now - self._active_route.sampled_at < self.cfg.switch_guard_s + ): + logger.info( + "route_rejected", + extra={ + "reason": "switch_guard", + "selected": self._route_key(self._active_route.target), + "candidate": self._route_key(best.target), + }, + ) + return self._active_route + if self._active_route and not self._hard_failure(self._active_route.target): + logger.info("route_rejected", extra={"reason": "active_route_not_failed", "selected": self._route_key(self._active_route.target), "candidate": self._route_key(best.target)}) + return self._active_route + self._active_route = best + self._active_until = now + self.cfg.sticky_session_s + self._session.state = "session_stable_window" if gameplay_active else "session_start" + self._session.stable_since = now + logger.info("session_transition", extra={"transition": self._session.state, "route": self._route_key(best.target)}) + logger.info("route_selected", extra={"route": self._route_key(best.target), "score": best.score, "median_rtt_ms": best.median_rtt_ms, "jitter_ms": best.jitter_ms, "packet_loss": best.packet_loss, "handshake": best.handshake_success_rate, "stability": best.session_stability}) + return best + + async def record_runtime_metrics(self, target: ProbeTarget, metrics: RuntimeMetrics) -> None: + await self.store.record_runtime_metrics(target, metrics) + + def register_route_failure(self, target: ProbeTarget) -> bool: + key = self._route_key(target) + self._failure_counts[key] = self._failure_counts.get(key, 0) + 1 + logger.info("session_transition", extra={"transition": "session_fail", "route": key, "failure_count": self._failure_counts[key]}) + return self._hard_failure(target) + + def bound_transport_route(self) -> ProbeTarget | None: + return self._active_route.target if self._active_route else None + + async def shutdown(self) -> None: + self._stopped = True + for task in list(self._scan_tasks): + task.cancel() + if self._scan_tasks: + await asyncio.gather(*self._scan_tasks, return_exceptions=True) + self._scan_tasks.clear() + + def _route_key(self, target: ProbeTarget) -> str: + return f"{target.ip}:{target.port}:{target.sni}:{target.transport_profile}" + + def _hard_failure(self, target: ProbeTarget) -> bool: + return self._failure_counts.get(self._route_key(target), 0) >= self.cfg.circuit_breaker_failures diff --git a/src/core/adaptive_transport/hygiene.py b/src/core/adaptive_transport/hygiene.py new file mode 100644 index 0000000..5182efc --- /dev/null +++ b/src/core/adaptive_transport/hygiene.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import ipaddress + + +def validate_public_ip(ip: str) -> str: + try: + addr = ipaddress.ip_address(ip) + except ValueError as exc: + raise ValueError(f"invalid ip: {ip}") from exc + + if ( + addr.is_private + or addr.is_multicast + or addr.is_loopback + or addr.is_reserved + or addr.is_unspecified + or addr.is_link_local + ): + raise ValueError(f"non-public ip rejected: {ip}") + return ip diff --git a/src/core/adaptive_transport/models.py b/src/core/adaptive_transport/models.py new file mode 100644 index 0000000..56bc159 --- /dev/null +++ b/src/core/adaptive_transport/models.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Literal +import time + +ProbeKind = Literal["tcp_syn", "tls", "h2_preface", "quic"] + + +@dataclass(slots=True, frozen=True) +class ProbeTarget: + ip: str + port: int + sni: str + alpn: tuple[str, ...] = ("h2", "http/1.1") + transport_profile: str = "vless_reality" + + +@dataclass(slots=True) +class ProbeObservation: + kind: ProbeKind + ok: bool + latency_ms: float + packet_loss: float = 0.0 + + +@dataclass(slots=True) +class RouteScore: + target: ProbeTarget + median_rtt_ms: float + jitter_ms: float + packet_loss: float + handshake_success_rate: float + session_stability: float + score: float + sampled_at: float = field(default_factory=time.time) + + +@dataclass(slots=True) +class RuntimeMetrics: + disconnects: int = 0 + retransmissions: int = 0 + latency_spikes: int = 0 + packet_delay_variance: float = 0.0 + + +@dataclass(slots=True) +class SessionState: + state: Literal["session_start", "session_active", "session_stable_window"] = "session_start" + started_at: float = field(default_factory=time.time) + stable_since: float | None = None diff --git a/src/core/adaptive_transport/probe.py b/src/core/adaptive_transport/probe.py new file mode 100644 index 0000000..3cfb5b6 --- /dev/null +++ b/src/core/adaptive_transport/probe.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +import asyncio +import socket +import ssl +import statistics +import time +from dataclasses import dataclass + +from .models import ProbeObservation, ProbeTarget + + +@dataclass(slots=True) +class ProbeConfig: + timeout_s: float = 2.5 + retries: int = 3 + + +class AsyncRouteProbe: + def __init__(self, cfg: ProbeConfig | None = None): + self.cfg = cfg or ProbeConfig() + + async def probe(self, target: ProbeTarget, include_quic: bool = False) -> list[ProbeObservation]: + samples: list[ProbeObservation] = [] + for _ in range(self.cfg.retries): + samples.append(await self._tcp_syn_probe(target)) + samples.append(await self._tls_probe(target)) + samples.append(await self._h2_preface_probe(target)) + if include_quic: + samples.append(await self._quic_probe(target)) + return samples + + async def _tcp_syn_probe(self, target: ProbeTarget) -> ProbeObservation: + start = time.perf_counter() + ok = False + writer = None + try: + fut = asyncio.open_connection(target.ip, target.port) + _, writer = await asyncio.wait_for(fut, timeout=self.cfg.timeout_s) + ok = True + except Exception: + ok = False + finally: + if writer is not None: + writer.close() + await writer.wait_closed() + return ProbeObservation("tcp_syn", ok, (time.perf_counter() - start) * 1000) + + async def _tls_probe(self, target: ProbeTarget) -> ProbeObservation: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.set_alpn_protocols(list(target.alpn)) + start = time.perf_counter() + writer = None + ok = False + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(target.ip, target.port, ssl=ctx, server_hostname=target.sni), + timeout=self.cfg.timeout_s, + ) + ssl_obj = writer.get_extra_info("ssl_object") + ok = bool(reader and ssl_obj and ssl_obj.version()) + except Exception: + ok = False + finally: + if writer is not None: + writer.close() + await writer.wait_closed() + return ProbeObservation("tls", ok, (time.perf_counter() - start) * 1000) + + async def _h2_preface_probe(self, target: ProbeTarget) -> ProbeObservation: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.set_alpn_protocols(["h2"]) + start = time.perf_counter() + writer = None + reader = None + ok = False + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(target.ip, target.port, ssl=ctx, server_hostname=target.sni), + timeout=self.cfg.timeout_s, + ) + ssl_obj = writer.get_extra_info("ssl_object") + if not ssl_obj or ssl_obj.selected_alpn_protocol() != "h2": + raise RuntimeError("h2 alpn not negotiated") + writer.write(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + await writer.drain() + frame_header = await asyncio.wait_for(reader.readexactly(9), timeout=0.4) + ok = len(frame_header) == 9 + except Exception: + ok = False + finally: + if writer is not None: + writer.close() + await writer.wait_closed() + return ProbeObservation("h2_preface", ok, (time.perf_counter() - start) * 1000) + + async def _quic_probe(self, target: ProbeTarget) -> ProbeObservation: + loop = asyncio.get_running_loop() + start = time.perf_counter() + udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp_socket.setblocking(False) + ok = False + try: + udp_socket.connect((target.ip, target.port)) + initial = bytes.fromhex("c300000001088394c8f03e5157080000449e00000002") + await loop.sock_sendall(udp_socket, initial) + response = await asyncio.wait_for(loop.sock_recv(udp_socket, 1200), timeout=min(self.cfg.timeout_s, 0.5)) + ok = bool(response) + except Exception: + ok = False + finally: + udp_socket.close() + return ProbeObservation("quic", ok, (time.perf_counter() - start) * 1000) + + +def summarize(samples: list[ProbeObservation]) -> tuple[float, float, float, float, float]: + lat = [s.latency_ms for s in samples if s.ok] + if not lat: + return 9_999.0, 5_000.0, 1.0, 0.0, 0.0 + median = statistics.median(lat) + jitter = statistics.pstdev(lat) if len(lat) > 1 else 0.0 + loss = 1.0 - (len(lat) / max(1, len(samples))) + handshake_success = len([s for s in samples if s.kind == "tls" and s.ok]) / max(1, len([s for s in samples if s.kind == "tls"])) + stability = 1.0 / (1.0 + jitter + (loss * 100.0)) + return median, jitter, loss, handshake_success, stability diff --git a/src/core/adaptive_transport/storage.py b/src/core/adaptive_transport/storage.py new file mode 100644 index 0000000..2560b82 --- /dev/null +++ b/src/core/adaptive_transport/storage.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import asyncio +import sqlite3 +import time +from pathlib import Path + +from .models import ProbeTarget, RouteScore, RuntimeMetrics + + +class RouteIntelligenceStore: + def __init__(self, path: str): + self.path = Path(path) + self._lock = asyncio.Lock() + self._init_db() + + def _init_db(self) -> None: + with sqlite3.connect(self.path) as conn: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS route_scores ( + ip TEXT NOT NULL, + port INTEGER NOT NULL, + sni TEXT NOT NULL, + profile TEXT NOT NULL, + sampled_at REAL NOT NULL, + median_rtt_ms REAL NOT NULL, + jitter_ms REAL NOT NULL, + packet_loss REAL NOT NULL, + handshake_success REAL NOT NULL, + session_stability REAL NOT NULL, + score REAL NOT NULL, + success_count INTEGER NOT NULL DEFAULT 0, + failure_count INTEGER NOT NULL DEFAULT 0, + cooldown_until REAL NOT NULL DEFAULT 0, + retired INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS route_runtime_metrics ( + ip TEXT NOT NULL, + port INTEGER NOT NULL, + sni TEXT NOT NULL, + profile TEXT NOT NULL, + observed_at REAL NOT NULL, + disconnects INTEGER NOT NULL, + retransmissions INTEGER NOT NULL, + latency_spikes INTEGER NOT NULL, + packet_delay_variance REAL NOT NULL + ) + """ + ) + + async def record_score(self, score: RouteScore) -> None: + async with self._lock: + await asyncio.to_thread(self._record_score_sync, score) + + def _record_score_sync(self, score: RouteScore) -> None: + with sqlite3.connect(self.path, timeout=30) as conn: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute( + """INSERT INTO route_scores + (ip,port,sni,profile,sampled_at,median_rtt_ms,jitter_ms,packet_loss,handshake_success,session_stability,score,success_count,failure_count) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""", + ( + score.target.ip, + score.target.port, + score.target.sni, + score.target.transport_profile, + score.sampled_at, + score.median_rtt_ms, + score.jitter_ms, + score.packet_loss, + score.handshake_success_rate, + score.session_stability, + score.score, + 1 if score.score > 0 else 0, + 0 if score.score > 0 else 1, + ), + ) + + async def record_runtime_metrics(self, target: ProbeTarget, metrics: RuntimeMetrics, observed_at: float | None = None) -> None: + async with self._lock: + await asyncio.to_thread(self._record_runtime_metrics_sync, target, metrics, observed_at or time.time()) + + def _record_runtime_metrics_sync(self, target: ProbeTarget, metrics: RuntimeMetrics, observed_at: float) -> None: + with sqlite3.connect(self.path, timeout=30) as conn: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute( + """INSERT INTO route_runtime_metrics + (ip,port,sni,profile,observed_at,disconnects,retransmissions,latency_spikes,packet_delay_variance) + VALUES (?,?,?,?,?,?,?,?,?)""", + (target.ip, target.port, target.sni, target.transport_profile, observed_at, metrics.disconnects, metrics.retransmissions, metrics.latency_spikes, metrics.packet_delay_variance), + ) + + async def top_routes(self, limit: int = 5, decay_window_s: float = 900.0) -> list[tuple[str, int, str, str, float]]: + now = time.time() + async with self._lock: + return await asyncio.to_thread(self._top_routes_sync, limit, now, decay_window_s) + + def _top_routes_sync(self, limit: int, now: float, decay_window_s: float): + with sqlite3.connect(self.path) as conn: + rows = conn.execute( + """SELECT s.ip,s.port,s.sni,s.profile, + AVG( + s.score * CASE + WHEN (? - s.sampled_at) >= ? THEN 0.05 + ELSE EXP(-((? - s.sampled_at) / ?)) + END + ) + - COALESCE(AVG(ABS(s.score - ss.mean_score)), 0.0) * 0.35 + - COALESCE(SUM(m.disconnects + m.retransmissions + m.latency_spikes) * 0.02, 0.0) + - COALESCE(AVG(m.packet_delay_variance) * 0.01, 0.0) AS decayed + FROM route_scores s + LEFT JOIN ( + SELECT ip,port,sni,profile,AVG(score) AS mean_score FROM route_scores GROUP BY ip,port,sni,profile + ) ss ON ss.ip=s.ip AND ss.port=s.port AND ss.sni=s.sni AND ss.profile=s.profile + LEFT JOIN route_runtime_metrics m ON m.ip=s.ip AND m.port=s.port AND m.sni=s.sni AND m.profile=s.profile + WHERE s.retired = 0 AND s.cooldown_until < ? + GROUP BY s.ip,s.port,s.sni,s.profile + ORDER BY decayed DESC + LIMIT ?""", + (now, max(1.0, decay_window_s), now, max(1.0, decay_window_s), now, max(1.0, decay_window_s), now, limit), + ).fetchall() + return rows diff --git a/src/core/adaptive_transport/transport_profiles.py b/src/core/adaptive_transport/transport_profiles.py new file mode 100644 index 0000000..2e4eabb --- /dev/null +++ b/src/core/adaptive_transport/transport_profiles.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True, slots=True) +class TransportProfile: + name: str + protocol: str + fallback: tuple[str, ...] + browser_fingerprint: str + + +PROFILES = { + "vless_reality": TransportProfile("vless_reality", "tcp+tls", ("h2", "h3", "udp_over_tcp"), "chrome_124"), + "http2_fallback": TransportProfile("http2_fallback", "h2", ("h1",), "firefox_125"), + "http3_fallback": TransportProfile("http3_fallback", "h3", ("h2",), "chrome_124"), + "udp_over_tcp": TransportProfile("udp_over_tcp", "tcp", ("h2",), "safari_17"), + "quic": TransportProfile("quic", "udp", ("h3", "h2"), "chrome_124"), +} diff --git a/src/core/config_sync.py b/src/core/config_sync.py new file mode 100644 index 0000000..64a1e03 --- /dev/null +++ b/src/core/config_sync.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import hashlib +import json +from pathlib import Path + + +class ConfigSyncError(RuntimeError): + pass + + +class ConfigSyncManager: + def __init__(self, repo_root: Path): + self.repo_root = repo_root + self.config_path = repo_root / "config.json" + self.code_gs_path = repo_root / "apps_script" / "Code.gs" + self.worker_path = repo_root / "apps_script" / "cloudflare_worker.js" + self.state_path = repo_root / ".config_sync_state.json" + + def sync_all_configs(self, config: dict) -> dict: + if not isinstance(config, dict): + raise ConfigSyncError("config must be a JSON object") + script_ids = config.get("script_ids") or config.get("script_id") + if not script_ids: + raise ConfigSyncError("missing script_id/script_ids") + auth_key = str(config.get("auth_key", "")).strip() + if not auth_key: + raise ConfigSyncError("missing auth_key") + + artifacts = { + "config.json": self._normalized_config(config), + "apps_script/Code.gs": self._sync_code_gs(auth_key), + "apps_script/cloudflare_worker.js": self._sync_worker_psk(config), + } + self._write_sync_state(artifacts) + return artifacts + + def _normalized_config(self, config: dict) -> str: + return json.dumps(config, ensure_ascii=False, indent=2) + "\n" + + def _sync_code_gs(self, auth_key: str) -> str: + content = self.code_gs_path.read_text(encoding="utf-8") + marker = 'const AUTH_KEY = "' + idx = content.find(marker) + if idx < 0: + raise ConfigSyncError("AUTH_KEY declaration not found in Code.gs") + end = content.find('";', idx + len(marker)) + if end < 0: + raise ConfigSyncError("AUTH_KEY declaration malformed in Code.gs") + return content[: idx + len(marker)] + auth_key + content[end:] + + def _sync_worker_psk(self, config: dict) -> str: + content = self.worker_path.read_text(encoding="utf-8") + psk = str(config.get("exit_node", {}).get("psk") or config.get("auth_key", "")).strip() + marker = 'const PSK = "' + idx = content.find(marker) + if idx < 0: + raise ConfigSyncError("PSK declaration not found in cloudflare_worker.js") + end = content.find('";', idx + len(marker)) + if end < 0: + raise ConfigSyncError("PSK declaration malformed in cloudflare_worker.js") + return content[: idx + len(marker)] + psk + content[end:] + + def _write_sync_state(self, artifacts: dict[str, str]) -> None: + checksums = {k: hashlib.sha256(v.encode("utf-8")).hexdigest() for k, v in artifacts.items()} + self.state_path.write_text(json.dumps({"checksums": checksums}, indent=2) + "\n", encoding="utf-8") diff --git a/src/core/dns_cache.py b/src/core/dns_cache.py new file mode 100644 index 0000000..e43952e --- /dev/null +++ b/src/core/dns_cache.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import time +from dataclasses import dataclass + + +@dataclass(slots=True) +class DnsEntry: + families: list[tuple[int, str]] + expires_at: float + + +class DnsCache: + def __init__(self, ttl_s: float = 120.0, max_items: int = 4096): + self.ttl_s = max(1.0, ttl_s) + self.max_items = max(64, max_items) + self._cache: dict[str, DnsEntry] = {} + + def get(self, host: str) -> list[tuple[int, str]] | None: + item = self._cache.get(host) + now = time.time() + if not item: + return None + if item.expires_at <= now: + self._cache.pop(host, None) + return None + return list(item.families) + + def set(self, host: str, families: list[tuple[int, str]]) -> None: + if len(self._cache) >= self.max_items: + # bounded memory: evict oldest inserted key + oldest = next(iter(self._cache.keys()), None) + if oldest is not None: + self._cache.pop(oldest, None) + self._cache[host] = DnsEntry(families=list(families), expires_at=time.time() + self.ttl_s) diff --git a/src/core/google_ip_scanner.py b/src/core/google_ip_scanner.py index 9a17c5a..96dc30a 100644 --- a/src/core/google_ip_scanner.py +++ b/src/core/google_ip_scanner.py @@ -9,7 +9,10 @@ from __future__ import annotations import asyncio +import contextlib +import json import logging +import random import ssl import time from dataclasses import dataclass @@ -32,6 +35,84 @@ def ok(self) -> bool: return self.latency_ms is not None +def _classify_fronted_failure(response: bytes, expected_host: str) -> str | None: + text = response.decode("utf-8", errors="ignore") + lower = text.lower() + if " ProbeResult: + base_delay = 0.2 + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.set_alpn_protocols(["h2", "http/1.1"]) + start_time = time.time() + last_error = "probe_failed" + for attempt in range(retries): + writer = None + try: + # TLS warm-up + relay-aligned TLS path to front domain with explicit IPv4 target. + reader, writer = await asyncio.wait_for( + asyncio.open_connection(ip, 443, ssl=ctx, server_hostname=sni), + timeout=timeout, + ) + ssl_obj = writer.get_extra_info("ssl_object") + if not ssl_obj: + return ProbeResult(ip=ip, error="tls_warmup_failed") + if ssl_obj.selected_alpn_protocol() == "h2": + writer.write(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + await writer.drain() + head = await asyncio.wait_for(reader.read(256), timeout=timeout) + else: + req = f"HEAD / HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n".encode() + writer.write(req) + await writer.drain() + head = await asyncio.wait_for(reader.read(512), timeout=timeout) + failure = _classify_fronted_failure(head, sni) + if failure: + return ProbeResult(ip=ip, error=failure) + if not head: + return ProbeResult(ip=ip, error="empty_response") + return ProbeResult(ip=ip, latency_ms=int((time.time() - start_time) * 1000)) + except asyncio.TimeoutError: + last_error = "timeout" + except OSError as e: + last_error = f"network_error:{e.strerror or str(e)}" + except Exception as e: + last_error = f"probe_failed:{type(e).__name__}" + finally: + if writer is not None: + writer.close() + with contextlib.suppress(Exception): + await writer.wait_closed() + if attempt < retries - 1: + await asyncio.sleep((base_delay * (2**attempt)) + random.uniform(0.0, 0.075)) + return ProbeResult(ip=ip, error=last_error) + + async def _probe_ip( ip: str, sni: str, @@ -51,60 +132,7 @@ async def _probe_ip( ProbeResult with latency_ms (if successful) or error message. """ async with semaphore: - start_time = time.time() - try: - # Create SSL context that skips certificate verification - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - - # Connect to IP:443 with SNI set to the fronting domain - reader, writer = await asyncio.wait_for( - asyncio.open_connection( - ip, - 443, - ssl=ctx, - server_hostname=sni, - ), - timeout=timeout, - ) - - # Send minimal HTTP HEAD request - request = f"HEAD / HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n" - writer.write(request.encode()) - await writer.drain() - - # Read response header (first 256 bytes is plenty for HTTP status) - response = await asyncio.wait_for(reader.read(256), timeout=timeout) - - writer.close() - try: - await writer.wait_closed() - except Exception: - pass - - # Check if we got an HTTP response - if not response: - return ProbeResult(ip=ip, error="empty response") - - response_str = response.decode("utf-8", errors="ignore") - if not response_str.startswith("HTTP/"): - return ProbeResult(ip=ip, error=f"invalid response: {response_str[:30]!r}") - - # Success — return latency in milliseconds - elapsed_ms = int((time.time() - start_time) * 1000) - return ProbeResult(ip=ip, latency_ms=elapsed_ms) - - except asyncio.TimeoutError: - return ProbeResult(ip=ip, error="timeout") - except ConnectionRefusedError: - return ProbeResult(ip=ip, error="connection refused") - except ConnectionResetError: - return ProbeResult(ip=ip, error="connection reset") - except OSError as e: - return ProbeResult(ip=ip, error=f"network error: {e.strerror or str(e)}") - except Exception as e: - return ProbeResult(ip=ip, error=f"probe failed: {type(e).__name__}") + return await probe_via_fronted_h2(ip=ip, sni=sni, timeout=timeout) async def run(front_domain: str) -> bool: diff --git a/src/core/logging_utils.py b/src/core/logging_utils.py index 8f3df11..c2c1c5b 100644 --- a/src/core/logging_utils.py +++ b/src/core/logging_utils.py @@ -187,7 +187,8 @@ def format(self, record: logging.LogRecord) -> str: if self.use_color: time_part = f"{DIM}{FG_GRAY}{time_part}{RESET}" - line = f"{time_part} {level_part} {comp_part} {message}" + event_meta = _event_metadata(record) + line = f"{time_part} {level_part} {comp_part} {message}{event_meta}" # Exception tracebacks: render dimmed below the main line. if record.exc_info: @@ -204,6 +205,23 @@ def format(self, record: logging.LogRecord) -> str: return line +def _event_metadata(record: logging.LogRecord) -> str: + """Render common structured metadata fields when present.""" + fields = [] + for key in ( + "request_type", + "status_code", + "success", + "quota_state", + "relay_state", + ): + if hasattr(record, key): + fields.append(f"{key}={getattr(record, key)}") + if not fields: + return "" + return " {" + ", ".join(fields) + "}" + + # ─── public API ──────────────────────────────────────────────────────────── def configure(level: str = "INFO", *, stream=None) -> None: diff --git a/src/core/quota_monitor.py b/src/core/quota_monitor.py new file mode 100644 index 0000000..0be51f5 --- /dev/null +++ b/src/core/quota_monitor.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +import time + + +@dataclass(slots=True) +class ScriptQuotaStat: + success: int = 0 + failure: int = 0 + recent_errors: list[str] = field(default_factory=list) + + def state(self) -> str: + if any("quota" in e.lower() for e in self.recent_errors[-5:]): + return "WARN" + if self.failure > (self.success * 2 + 3): + return "DEAD" + return "GOOD" + + +class QuotaMonitor: + def __init__(self): + self._stats: dict[str, ScriptQuotaStat] = {} + self._events: dict[str, list[tuple[float, bool]]] = {} + self._cooldown_until: dict[str, float] = {} + + def record(self, script_id: str, ok: bool, error: str | None = None) -> None: + stat = self._stats.setdefault(script_id, ScriptQuotaStat()) + self._events.setdefault(script_id, []).append((time.time(), ok)) + if len(self._events[script_id]) > 2000: + self._events[script_id] = self._events[script_id][-2000:] + if ok: + stat.success += 1 + else: + stat.failure += 1 + if error: + stat.recent_errors.append(error) + if len(stat.recent_errors) > 50: + stat.recent_errors = stat.recent_errors[-50:] + if "quota" in error.lower(): + self._cooldown_until[script_id] = time.time() + 900.0 + + def snapshot(self) -> dict[str, str]: + return {sid: self.state(sid) for sid in self._stats.keys()} + + def state(self, script_id: str) -> str: + stat = self._stats.get(script_id) + if stat is None: + return "UNKNOWN" + now = time.time() + if self._cooldown_until.get(script_id, 0.0) > now: + return "COOLDOWN" + return stat.state() + + def success_rate(self, script_id: str, window_s: float = 900.0) -> float: + now = time.time() + events = self._events.get(script_id, []) + recent = [ok for ts, ok in events if ts >= now - max(1.0, window_s)] + if not recent: + return 0.0 + return sum(1 for ok in recent if ok) / len(recent) + + def predict_exhaustion_risk(self, script_id: str) -> str: + sr = self.success_rate(script_id) + if self.state(script_id) == "COOLDOWN": + return "HIGH" + if sr < 0.35: + return "HIGH" + if sr < 0.65: + return "MEDIUM" + return "LOW" diff --git a/src/proxy/proxy_server.py b/src/proxy/proxy_server.py index c6c2fe9..78e79ba 100644 --- a/src/proxy/proxy_server.py +++ b/src/proxy/proxy_server.py @@ -38,6 +38,7 @@ TRACE_HOST_SUFFIXES, UNCACHEABLE_HEADER_NAMES, ) +from core.dns_cache import DnsCache from relay.domain_fronter import DomainFronter from .socks5 import negotiate_socks5 from .proxy_support import ( @@ -96,6 +97,10 @@ def __init__(self, config: dict): self._tcp_connect_timeout = self._cfg_float( config, "tcp_connect_timeout", TCP_CONNECT_TIMEOUT, minimum=1.0, ) + self._dns_cache = DnsCache( + ttl_s=float(config.get("dns_cache_ttl_s", 120.0)), + max_items=int(config.get("dns_cache_max_items", 4096)), + ) self._download_min_size = self._cfg_int( config, "chunked_download_min_size", 5 * 1024 * 1024, minimum=0, ) @@ -520,7 +525,11 @@ async def _on_socks_client(self, reader: asyncio.StreamReader, result = await negotiate_socks5(reader, writer) if result is None: return - host, port = result + cmd, host, port = result + if cmd == "udp_associate": + log.info("SOCKS5 UDP ASSOCIATE requested by %s", addr) + await self._handle_socks5_udp_associate(reader, writer) + return log.info("SOCKS5 CONNECT → %s:%d", host, port) await self._handle_target_tunnel(host, port, reader, writer) except asyncio.IncompleteReadError: @@ -539,6 +548,29 @@ async def _on_socks_client(self, reader: asyncio.StreamReader, except Exception: pass + + + async def _handle_socks5_udp_associate(self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter): + """Handle SOCKS5 UDP ASSOCIATE for low-latency UDP apps (e.g., games).""" + loop = asyncio.get_running_loop() + transport, protocol = await loop.create_datagram_endpoint( + lambda: _Socks5UdpRelayProtocol(self.fronter), + local_addr=(self.socks_host, 0), + ) + sockname = transport.get_extra_info("sockname") + bind_ip = sockname[0] if sockname else self.socks_host + bind_port = sockname[1] if sockname else 0 + + try: + writer.write(b"\x05\x00\x00\x01" + socket.inet_aton(bind_ip) + bind_port.to_bytes(2, "big")) + await writer.drain() + await reader.read() + finally: + transport.close() + + # ── CONNECT (HTTPS tunnelling) ──────────────────────────────── async def _do_connect(self, target: str, reader, writer): @@ -782,30 +814,35 @@ async def _open_tcp_connection(self, target: str, port: int, ipaddress.ip_address(lookup_target) candidates = [(0, lookup_target)] except ValueError: - try: - infos = await asyncio.wait_for( - loop.getaddrinfo( - lookup_target, - port, - family=socket.AF_UNSPEC, - type=socket.SOCK_STREAM, - ), - timeout=timeout, - ) - except Exception as exc: - raise OSError(f"dns lookup failed for {lookup_target}: {exc!r}") from exc - - candidates = [] - seen = set() - for family, _type, _proto, _canon, sockaddr in infos: - ip = sockaddr[0] - key = (family, ip) - if key in seen: - continue - seen.add(key) - candidates.append((family, ip)) - - candidates.sort(key=lambda item: 0 if item[0] == socket.AF_INET else 1) + cached = self._dns_cache.get(lookup_target) + if cached is not None: + candidates = cached + else: + try: + infos = await asyncio.wait_for( + loop.getaddrinfo( + lookup_target, + port, + family=socket.AF_UNSPEC, + type=socket.SOCK_STREAM, + ), + timeout=timeout, + ) + except Exception as exc: + raise OSError(f"dns lookup failed for {lookup_target}: {exc!r}") from exc + + candidates = [] + seen = set() + for family, _type, _proto, _canon, sockaddr in infos: + ip = sockaddr[0] + key = (family, ip) + if key in seen: + continue + seen.add(key) + candidates.append((family, ip)) + + candidates.sort(key=lambda item: 0 if item[0] == socket.AF_INET else 1) + self._dns_cache.set(lookup_target, candidates) for family, ip in candidates: try: @@ -1408,3 +1445,72 @@ async def _do_http(self, header_block: bytes, reader, writer): writer.write(response) await writer.drain() + + +class _Socks5UdpRelayProtocol(asyncio.DatagramProtocol): + """Best-effort SOCKS5 UDP relay (single-hop direct UDP forwarding).""" + + def __init__(self, fronter): + self.transport: asyncio.DatagramTransport | None = None + self._client_addr = None + self._fronter = fronter + self._loop = asyncio.get_running_loop() + + def connection_made(self, transport): + self.transport = transport + + + async def _forward_udp(self, dst: str, dport: int, payload: bytes): + if self.transport is None or self._client_addr is None: + return + relayed = await self._fronter.relay_udp_packet(dst, dport, payload) + if relayed is not None: + header = self._build_socks_header(dst, dport) + if header is not None: + self.transport.sendto(header + relayed, self._client_addr) + return + self.transport.sendto(payload, (dst, dport)) + + @staticmethod + def _build_socks_header(host: str, port: int) -> bytes | None: + try: + host_raw = socket.inet_aton(host) + return b"\x00\x00\x00\x01" + host_raw + port.to_bytes(2, "big") + except OSError: + return None + def datagram_received(self, data, addr): + if self.transport is None: + return + + if self._client_addr is None or addr == self._client_addr: + self._client_addr = addr + if len(data) < 10 or data[2] != 0x00: + return + atyp = data[3] + pos = 4 + if atyp == 0x01: # IPv4 + dst = socket.inet_ntoa(data[pos:pos + 4]) + pos += 4 + elif atyp == 0x03: # Domain + ln = data[pos] + pos += 1 + dst = data[pos:pos + ln].decode(errors="replace") + pos += ln + elif atyp == 0x04: # IPv6 + dst = socket.inet_ntop(socket.AF_INET6, data[pos:pos + 16]) + pos += 16 + else: + return + dport = int.from_bytes(data[pos:pos + 2], "big") + payload = data[pos + 2:] + self._loop.create_task(self._forward_udp(dst, dport, payload)) + return + + # response from remote server -> encapsulate back to client + host, port = addr[0], addr[1] + try: + host_raw = socket.inet_aton(host) + header = b"\x00\x00\x00\x01" + host_raw + port.to_bytes(2, "big") + except OSError: + return + self.transport.sendto(header + data, self._client_addr) diff --git a/src/proxy/socks5.py b/src/proxy/socks5.py index 6af0375..97fd920 100644 --- a/src/proxy/socks5.py +++ b/src/proxy/socks5.py @@ -22,8 +22,8 @@ async def negotiate_socks5( reader: asyncio.StreamReader, writer: asyncio.StreamWriter, -) -> tuple[str, int] | None: - """Perform a SOCKS5 handshake and return the requested (host, port). + ) -> tuple[str, str, int] | None: + """Perform a SOCKS5 handshake and return the requested SOCKS command and destination tuple. Sends protocol-level replies directly to *writer*. Returns ``None`` and leaves the connection in a closed state if negotiation fails at @@ -55,8 +55,8 @@ async def negotiate_socks5( # ── Request ─────────────────────────────────────────────────── req = await asyncio.wait_for(reader.readexactly(4), timeout=15) ver, cmd, _rsv, atyp = req - if ver != 5 or cmd != 0x01: - # Only CONNECT (0x01) is supported + if ver != 5 or cmd not in (0x01, 0x03): + # Support CONNECT (0x01) and UDP ASSOCIATE (0x03) only. writer.write(b"\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00") await writer.drain() return None @@ -85,4 +85,5 @@ async def negotiate_socks5( writer.write(b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00") await writer.drain() - return host, port + cmd_name = "connect" if cmd == 0x01 else "udp_associate" + return cmd_name, host, port diff --git a/src/relay/domain_fronter.py b/src/relay/domain_fronter.py index 842eeab..d9e264f 100644 --- a/src/relay/domain_fronter.py +++ b/src/relay/domain_fronter.py @@ -1562,6 +1562,44 @@ async def relay(self, method: str, url: str, latency_ns = int((time.perf_counter() - t0) * 1e9) self._record_site(url, len(result), latency_ns, errored) + async def relay_udp_packet(self, host: str, port: int, payload: bytes) -> bytes | None: + """Relay a single UDP packet via exit node over Apps Script. + + This encapsulates UDP into the existing HTTPS relay chain: + client UDP -> SOCKS5 UDP ASSOCIATE -> Apps Script -> exit node -> UDP target + """ + if not self._exit_node_enabled or not self._exit_node_url: + return None + + inner = { + "k": self._exit_node_psk, + "udp": 1, + "host": host, + "port": int(port), + "payload": base64.b64encode(payload).decode(), + } + inner_json = json.dumps(inner).encode() + outer = self._build_payload( + "POST", + self._exit_node_url, + {"Content-Type": "application/json"}, + inner_json, + ) + outer["ct"] = "application/json" + + raw = await self._batch_submit(outer) + _, _, relay_bytes = split_raw_response(raw) + obj = load_relay_json(relay_bytes) + if not isinstance(obj, dict) or obj.get("e"): + return None + b64 = obj.get("payload") + if not isinstance(b64, str) or not b64: + return b"" + try: + return base64.b64decode(b64) + except Exception: + return None + async def _coalesced_submit(self, key: str, payload: dict) -> bytes: """Dedup concurrent requests for the same URL (no Range header). @@ -2851,4 +2889,3 @@ def _parse_batch_body(self, resp_body: bytes, for item in items: results.append(parse_relay_json(item, self._max_response_body_bytes)) return results - diff --git a/src/relay/relay_response.py b/src/relay/relay_response.py index 412f38c..8f5f284 100644 --- a/src/relay/relay_response.py +++ b/src/relay/relay_response.py @@ -819,4 +819,14 @@ def parse_relay_response(body: bytes, max_body_bytes: int) -> bytes: log.warning("JSON parse failed. Response: %s", preview[:200]) return error_response(502, error_msg) + # New Code.gs contract can wrap legacy payload in: + # {ok:boolean, code:string, message:string, data:object|null} + if {"ok", "code", "message", "data"}.issubset(data.keys()): + if not data.get("ok"): + return error_response(502, f"Relay API error [{data.get('code')}]: {data.get('message')}") + wrapped = data.get("data") + if not isinstance(wrapped, dict): + return error_response(502, "Relay API returned invalid data envelope") + data = wrapped + return parse_relay_json(data, max_body_bytes) diff --git a/tests/test_adaptive_transport.py b/tests/test_adaptive_transport.py new file mode 100644 index 0000000..a607724 --- /dev/null +++ b/tests/test_adaptive_transport.py @@ -0,0 +1,46 @@ +import pathlib +import sys +import tempfile +import unittest + +ROOT = pathlib.Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from core.adaptive_transport.engine import AdaptiveRouteEngine +from core.adaptive_transport.hygiene import validate_public_ip +from core.adaptive_transport.models import ProbeTarget, RouteScore + + +class HygieneTests(unittest.TestCase): + def test_rejects_private(self): + with self.assertRaises(ValueError): + validate_public_ip("10.0.0.1") + + def test_accepts_public(self): + self.assertEqual(validate_public_ip("8.8.8.8"), "8.8.8.8") + + +class EngineTests(unittest.IsolatedAsyncioTestCase): + async def test_circuit_breaker(self): + with tempfile.TemporaryDirectory() as td: + engine = AdaptiveRouteEngine(f"{td}/intel.db") + t = ProbeTarget(ip="8.8.8.8", port=443, sni="www.google.com") + for _ in range(engine.cfg.circuit_breaker_failures - 1): + self.assertFalse(engine.register_route_failure(t)) + self.assertTrue(engine.register_route_failure(t)) + + async def test_select_route_sticky(self): + with tempfile.TemporaryDirectory() as td: + engine = AdaptiveRouteEngine(f"{td}/intel.db") + t = ProbeTarget(ip="8.8.8.8", port=443, sni="www.google.com") + r1 = RouteScore(t, 100, 1, 0.0, 1.0, 0.9, 0.8) + r2 = RouteScore(t, 90, 1, 0.0, 1.0, 0.9, 0.81) + chosen = await engine.select_route([r1], gameplay_active=True) + chosen2 = await engine.select_route([r2], gameplay_active=True) + self.assertIs(chosen, chosen2) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_config_sync.py b/tests/test_config_sync.py new file mode 100644 index 0000000..ed6b35a --- /dev/null +++ b/tests/test_config_sync.py @@ -0,0 +1,32 @@ +import json +import pathlib +import tempfile +import unittest + +ROOT = pathlib.Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +import sys +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from core.config_sync import ConfigSyncManager + + +class ConfigSyncTests(unittest.TestCase): + def test_sync_updates_codegs_and_worker(self): + with tempfile.TemporaryDirectory() as td: + repo = pathlib.Path(td) + (repo / "apps_script").mkdir() + (repo / "apps_script" / "Code.gs").write_text('const AUTH_KEY = "OLD";\n', encoding="utf-8") + (repo / "apps_script" / "cloudflare_worker.js").write_text('const PSK = "OLD";\n', encoding="utf-8") + cfg = {"script_id": "abc", "auth_key": "NEWKEY", "exit_node": {"psk": "NEWPSK"}} + sync = ConfigSyncManager(repo) + artifacts = sync.sync_all_configs(cfg) + self.assertIn('const AUTH_KEY = "NEWKEY";', artifacts["apps_script/Code.gs"]) + self.assertIn('const PSK = "NEWPSK";', artifacts["apps_script/cloudflare_worker.js"]) + state = json.loads((repo / ".config_sync_state.json").read_text(encoding="utf-8")) + self.assertIn("checksums", state) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_dns_quota.py b/tests/test_dns_quota.py new file mode 100644 index 0000000..cfd2439 --- /dev/null +++ b/tests/test_dns_quota.py @@ -0,0 +1,43 @@ +import pathlib +import sys +import time +import unittest + +ROOT = pathlib.Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from core.dns_cache import DnsCache +from core.quota_monitor import QuotaMonitor + + +class DnsQuotaTests(unittest.TestCase): + def test_dns_cache_expiry(self): + cache = DnsCache(ttl_s=1.0, max_items=64) + cache.set("a.test", [(2, "1.1.1.1")]) + self.assertIsNotNone(cache.get("a.test")) + cache._cache["a.test"].expires_at = time.time() - 1 + self.assertIsNone(cache.get("a.test")) + + def test_quota_monitor_states(self): + qm = QuotaMonitor() + qm.record("sid1", ok=True) + self.assertEqual(qm.snapshot()["sid1"], "GOOD") + for _ in range(5): + qm.record("sid2", ok=False, error="quota exceeded") + self.assertEqual(qm.snapshot()["sid2"], "COOLDOWN") + self.assertEqual(qm.predict_exhaustion_risk("sid2"), "HIGH") + + def test_quota_success_rate_prediction(self): + qm = QuotaMonitor() + for _ in range(8): + qm.record("sid3", ok=True) + for _ in range(2): + qm.record("sid3", ok=False, error="transient") + self.assertGreaterEqual(qm.success_rate("sid3"), 0.75) + self.assertEqual(qm.predict_exhaustion_risk("sid3"), "LOW") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_google_ip_scanner_probe.py b/tests/test_google_ip_scanner_probe.py new file mode 100644 index 0000000..9f7efde --- /dev/null +++ b/tests/test_google_ip_scanner_probe.py @@ -0,0 +1,50 @@ +import asyncio +import pathlib +import sys +import unittest +from unittest.mock import AsyncMock, Mock, patch + +ROOT = pathlib.Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from core.google_ip_scanner import _classify_fronted_failure, probe_via_fronted_h2 + + +class ScannerProbeTests(unittest.IsolatedAsyncioTestCase): + def test_html_detection(self): + err = _classify_fronted_failure(b"HTTP/1.1 200 OK\r\n\r\nlogin", "www.google.com") + self.assertEqual(err, "html_login_page") + + def test_invalid_json_detection(self): + payload = b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{bad-json" + err = _classify_fronted_failure(payload, "www.google.com") + self.assertEqual(err, "invalid_json_response") + + async def test_timeout_retry_behavior(self): + with patch("core.google_ip_scanner.asyncio.open_connection", new=AsyncMock(side_effect=asyncio.TimeoutError)): + with patch("core.google_ip_scanner.asyncio.sleep", new=AsyncMock()) as sleep_mock: + result = await probe_via_fronted_h2("8.8.8.8", "www.google.com", timeout=0.01, retries=3) + self.assertFalse(result.ok) + self.assertEqual(result.error, "timeout") + self.assertEqual(sleep_mock.await_count, 2) + + async def test_restricted_network_html_classification(self): + fake_reader = AsyncMock() + fake_reader.read = AsyncMock(return_value=b"HTTP/1.1 200 OK\r\n\r\nSign In") + fake_writer = Mock() + fake_writer.write = Mock() + fake_writer.close = Mock() + fake_writer.drain = AsyncMock(return_value=None) + fake_writer.wait_closed = AsyncMock(return_value=None) + fake_ssl = type("SSL", (), {"selected_alpn_protocol": lambda self: "http/1.1"})() + fake_writer.get_extra_info = lambda key: fake_ssl if key == "ssl_object" else None + with patch("core.google_ip_scanner.asyncio.open_connection", new=AsyncMock(return_value=(fake_reader, fake_writer))): + result = await probe_via_fronted_h2("1.1.1.1", "www.google.com", timeout=0.1, retries=1) + self.assertFalse(result.ok) + self.assertEqual(result.error, "html_login_page") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_logging_utils.py b/tests/test_logging_utils.py new file mode 100644 index 0000000..5ef05b9 --- /dev/null +++ b/tests/test_logging_utils.py @@ -0,0 +1,30 @@ +import logging +import pathlib +import sys +import unittest + +ROOT = pathlib.Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from core.logging_utils import PrettyFormatter + + +class LoggingUtilsTests(unittest.TestCase): + def test_metadata_suffix_rendered(self): + fmt = PrettyFormatter(use_color=False) + rec = logging.LogRecord( + name="Proxy", level=logging.INFO, pathname=__file__, lineno=1, msg="relay event", args=(), exc_info=None + ) + rec.request_type = "POST" + rec.status_code = 200 + rec.success = True + out = fmt.format(rec) + self.assertIn("request_type=POST", out) + self.assertIn("status_code=200", out) + self.assertIn("success=True", out) + + +if __name__ == "__main__": + unittest.main()