diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 07076d7..97d8158 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -6,21 +6,30 @@ name: release # registered as the trusted publisher, so the upload step cannot be moved into # another workflow without breaking the OIDC match. # -# Two entry points: +# Three entry points: # * push to main -> release-please maintains a release PR; when that PR is # merged, release-please bumps the version, tags, and cuts # a GitHub Release. The same run then builds and publishes. -# * workflow_dispatch -> a TestPyPI-only dry run. Builds a throwaway +# * workflow_dispatch (no tag) -> a TestPyPI-only dry run. Builds a throwaway # ``.devNNNN`` so the chain (OIDC -> environment -> # trusted publisher -> upload) can be exercised without # cutting a real release or colliding with an existing one. +# * workflow_dispatch (tag set) -> publish that exact existing tag. Used to +# salvage a tag that was cut (release_please succeeded) but +# did NOT publish (e.g. an earlier output-gate misfire). +# Builds from the tagged commit's pyproject.toml verbatim, +# pushes to TestPyPI and PyPI. on: push: branches: [main] workflow_dispatch: inputs: + tag: + description: "Existing tag to (re-)publish (e.g. v0.51.0). Leave blank for a TestPyPI dry run." + required: false + default: "" reason: - description: "Why this manual TestPyPI dry run (audit note only)." + description: "Why this manual dispatch (audit note only)." required: false default: "manual TestPyPI dry run" @@ -34,15 +43,26 @@ concurrency: cancel-in-progress: false jobs: - # 1. release-please: the only job that runs on an ordinary push to main. - # It keeps the release PR in sync and, on merge, creates the tag/release. + # 1. release-please: only runs on push to main (and is a no-op on tag-replay + # dispatches). It keeps the release PR in sync and, on merge, creates the + # tag/release. release-please: name: Release PR / tag + if: ${{ github.event_name == 'push' }} runs-on: ubuntu-latest outputs: - release_created: ${{ steps.rp.outputs['.--release_created'] }} - tag_name: ${{ steps.rp.outputs['.--tag_name'] }} - version: ${{ steps.rp.outputs['.--version'] }} + # Single-root manifest config: release-please-action v4 exposes a + # top-level ``release_created`` boolean that's "true" exactly when a + # release was cut on this run, alongside path-prefixed keys for + # multi-package configs. The path-prefixed access + # (``outputs['.--release_created']``) silently evaluates to empty for a + # single-root config -- that latent bug never fired here only because + # the engine's first publish (v0.50.3) went out via workflow_dispatch and + # bypassed this gate. The release-please push path needs the top-level + # key to publish. + release_created: ${{ steps.rp.outputs.release_created }} + tag_name: ${{ steps.rp.outputs.tag_name }} + version: ${{ steps.rp.outputs.version }} steps: - uses: googleapis/release-please-action@v4 id: rp @@ -50,19 +70,31 @@ jobs: config-file: release-please-config.json manifest-file: .release-please-manifest.json - # 2. build: runs on a real release (release_created) OR a manual dry run. + # 2. build: runs on a real release (release_created) OR any dispatch. # Produces a single sdist+wheel artifact consumed by both publish jobs so # the exact same bytes go to TestPyPI and (for stable releases) PyPI. build: name: Build distribution needs: release-please - if: ${{ needs.release-please.outputs.release_created == 'true' || github.event_name == 'workflow_dispatch' }} + # ``needs.release-please`` is skipped on a workflow_dispatch (the job's + # own ``if`` excludes it), and ``always()`` lets us still run as long as + # we're on a release path that justifies a build. + if: | + always() && ( + github.event_name == 'workflow_dispatch' + || needs.release-please.outputs.release_created == 'true' + ) runs-on: ubuntu-latest outputs: version: ${{ steps.ver.outputs.version }} is_prerelease: ${{ steps.ver.outputs.is_prerelease }} + publish_pypi: ${{ steps.ver.outputs.publish_pypi }} steps: - uses: actions/checkout@v4 + with: + # For a tag-replay dispatch we want the tagged commit's + # pyproject.toml verbatim, not whatever HEAD is on main. + ref: ${{ github.event.inputs.tag || github.ref }} - uses: astral-sh/setup-uv@v6 with: enable-cache: true @@ -72,23 +104,30 @@ jobs: run: | set -euo pipefail BASE=$(python3 -c "import tomllib;print(tomllib.load(open('pyproject.toml','rb'))['project']['version'])") - if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then + INPUT_TAG="${{ github.event.inputs.tag }}" + if [ "${{ github.event_name }}" = "workflow_dispatch" ] && [ -z "$INPUT_TAG" ]; then # Dry run: never collide with an existing TestPyPI version, never # look like a real release. Only the project version line is # anchored at column 0, so this rewrites exactly that one line. VER="${BASE}.dev$(date +%Y%m%d%H%M%S)" sed -i -E "s/^version = \".*\"/version = \"${VER}\"/" pyproject.toml echo "is_prerelease=true" >> "$GITHUB_OUTPUT" + echo "publish_pypi=false" >> "$GITHUB_OUTPUT" else - # Real release: trust release-please's bump already on main HEAD. + # Real release (release-please push) OR tag replay (dispatch with + # tag input): trust the checked-out pyproject.toml verbatim. VER="$BASE" case "$VER" in - *a*|*b*|*rc*|*dev*|*post*) echo "is_prerelease=true" >> "$GITHUB_OUTPUT" ;; - *) echo "is_prerelease=false" >> "$GITHUB_OUTPUT" ;; + *a*|*b*|*rc*|*dev*|*post*) + echo "is_prerelease=true" >> "$GITHUB_OUTPUT" + echo "publish_pypi=false" >> "$GITHUB_OUTPUT" ;; + *) + echo "is_prerelease=false" >> "$GITHUB_OUTPUT" + echo "publish_pypi=true" >> "$GITHUB_OUTPUT" ;; esac fi echo "version=$VER" >> "$GITHUB_OUTPUT" - echo "Building version $VER" + echo "Building version $VER (publish_pypi=$(grep ^publish_pypi $GITHUB_OUTPUT | cut -d= -f2))" - name: Build run: uv build - name: Show artifacts @@ -99,12 +138,17 @@ jobs: path: dist/ if-no-files-found: error - # 3. TestPyPI: always runs after a successful build (dry run or real release). - # skip-existing keeps a re-run idempotent instead of hard-failing on a - # version that is already on the index. + # 3. TestPyPI: always runs after a successful build (dry run or real release + # or tag replay). ``always() && build == success`` is required because + # GitHub Actions' implicit ``success()`` gate evaluates the *transitive* + # needs graph, so a skipped ``release-please`` (workflow_dispatch path) + # cascades a skip through ``build`` even when build itself succeeded. + # ``skip-existing`` keeps a re-run idempotent instead of hard-failing on + # a version that is already on the index. testpypi: name: Publish to TestPyPI needs: build + if: ${{ always() && needs.build.result == 'success' }} runs-on: ubuntu-latest environment: testpypi permissions: @@ -119,13 +163,15 @@ jobs: repository-url: https://test.pypi.org/legacy/ skip-existing: true - # 4. PyPI: only for a real, non-prerelease release. The `pypi` environment - # carries the manual-approval gate and is matched by the prod trusted - # publisher (configure both before the first stable tag). + # 4. PyPI: stable releases only. Driven by the build's own ``publish_pypi`` + # output so both the release-please path and a tag-replay dispatch share + # the same decision rule (non-prerelease version => publish). The + # ``always() && build == success`` clause is the same skip-cascade + # countermeasure as on ``testpypi``. pypi: name: Publish to PyPI - needs: [release-please, build] - if: ${{ needs.release-please.outputs.release_created == 'true' && needs.build.outputs.is_prerelease == 'false' }} + needs: build + if: ${{ always() && needs.build.result == 'success' && needs.build.outputs.publish_pypi == 'true' }} runs-on: ubuntu-latest environment: pypi permissions: @@ -136,3 +182,5 @@ jobs: name: dist path: dist/ - uses: pypa/gh-action-pypi-publish@release/v1 + with: + skip-existing: true diff --git a/mithwire/__init__.py b/mithwire/__init__.py index 8802a3a..4915ebd 100644 --- a/mithwire/__init__.py +++ b/mithwire/__init__.py @@ -15,6 +15,7 @@ from mithwire.core.element import Element from mithwire.core.tab import Tab from mithwire.core.util import loop, start +from mithwire.stealth import FingerprintConfig, Stealth, compute_launch_args __all__ = [ "loop", @@ -27,6 +28,9 @@ "Element", "ContraDict", "ProtocolException", + "FingerprintConfig", + "Stealth", + "compute_launch_args", ] __version__ = "0.50.3" \ No newline at end of file diff --git a/mithwire/cdp/network.py b/mithwire/cdp/network.py index 0c56c46..cc87656 100644 --- a/mithwire/cdp/network.py +++ b/mithwire/cdp/network.py @@ -1342,7 +1342,7 @@ class Cookie: #: Cookie expiration date as the number of seconds since the UNIX epoch. #: The value is set to -1 if the expiry date is not set. #: The value can be null for values that cannot be represented in - #: JSON (±Inf). + #: JSON (Β±Inf). expires: typing.Optional[float] = None #: Cookie SameSite type. diff --git a/mithwire/core/browser.py b/mithwire/core/browser.py index 5a3b028..2a49f12 100644 --- a/mithwire/core/browser.py +++ b/mithwire/core/browser.py @@ -121,6 +121,7 @@ def __init__(self, config: Config, **kwargs): self._keep_user_data_dir = None self._is_updating = asyncio.Event() self.connection: Connection = None + self.stealth = None super().__init__("", auto_attach=False) logger.debug("Session object initialized: %s" % vars(self)) @@ -410,8 +411,45 @@ async def start(self=None) -> Browser: self.websocket_url = self.info.webSocketDebuggerUrl await self.attach() await self.update_targets() + await self._apply_stealth() # await self + async def _apply_stealth(self) -> None: + """Apply the engine-owned anti-detect stealth to the live browser. + + The engine owns every browser-altering anti-detect capability, so this + runs on every launch. With no configured identity it still applies the + always-on baseline (window.chrome shim, headless UA cleanup when + headless, WebRTC leak protection when proxied). The resulting + :class:`~mithwire.stealth.Stealth` is stored on ``self.stealth`` so a + client can re-apply an identity later (e.g. once a proxy egress geo is + resolved). + """ + from ..stealth import Stealth + + config = self.config + # The engine is agnostic of any client's proxy abstraction: proxy + # presence is inferred purely from the launch flags. + proxied = any( + str(arg).startswith("--proxy-server=") + for arg in (getattr(config, "_browser_args", None) or []) + ) + stealth = Stealth( + self, + fingerprint=getattr(config, "fingerprint", None), + webrtc_leak_protection=getattr(config, "webrtc_leak_protection", "auto"), + headless=bool(getattr(config, "headless", False)), + proxied=proxied, + ) + # A freshly attached tab needs a brief moment before CDP overrides and + # new-document scripts reliably register on the about:blank target. + await asyncio.sleep(1.2) + try: + await stealth.apply_all() + except Exception as exc: # noqa: BLE001 + logger.warning("Anti-detect stealth application failed: %s", exc) + self.stealth = stealth + async def grant_all_permissions(self): """ grant permissions for: diff --git a/mithwire/core/config.py b/mithwire/core/config.py index 6dcad0b..aef4765 100644 --- a/mithwire/core/config.py +++ b/mithwire/core/config.py @@ -45,6 +45,8 @@ def __init__( host: str = AUTO, port: int = AUTO, expert: bool = AUTO, + fingerprint: Optional[object] = None, + webrtc_leak_protection: str = "auto", **kwargs: dict, ): """ @@ -110,6 +112,17 @@ def __init__( self.autodiscover_targets = True self.lang = lang + # Anti-detect stealth identity. The engine owns all browser-altering + # anti-detect code; a client only describes the identity it wants here. + # ``fingerprint`` may be a FingerprintConfig or a plain dict (normalized). + from ..stealth import FingerprintConfig + + if fingerprint is None or isinstance(fingerprint, FingerprintConfig): + self.fingerprint = fingerprint + else: + self.fingerprint = FingerprintConfig.from_dict(fingerprint) + self.webrtc_leak_protection = webrtc_leak_protection + # other keyword args will be accessible by attribute self.__dict__.update(kwargs) super().__init__() @@ -127,6 +140,18 @@ def __init__( "--disable-session-crashed-bubble", "--disable-search-engine-choice-screen", ] + # Stealth launch flags that must be set before the process starts + # (--lang, --force-webrtc-ip-handling-policy, headless window size). + # These cannot be retrofitted leak-free via CDP on a running process. + from ..stealth import compute_launch_args + + for arg in compute_launch_args( + self._browser_args, + fingerprint=self.fingerprint, + headless=self.headless, + ): + if arg not in self._browser_args: + self._browser_args.append(arg) @property def browser_args(self): diff --git a/mithwire/core/util.py b/mithwire/core/util.py index daedcab..4042cf7 100644 --- a/mithwire/core/util.py +++ b/mithwire/core/util.py @@ -42,6 +42,8 @@ async def start( host: Optional[str] = None, port: Optional[int] = None, expert: Optional[bool] = None, + fingerprint: Optional[object] = None, + webrtc_leak_protection: str = "auto", **kwargs: Optional[dict], ) -> Browser: """ @@ -96,6 +98,8 @@ async def start( host=host, port=port, expert=expert, + fingerprint=fingerprint, + webrtc_leak_protection=webrtc_leak_protection, **kwargs, ) from .browser import Browser diff --git a/mithwire/stealth/__init__.py b/mithwire/stealth/__init__.py new file mode 100644 index 0000000..aed520b --- /dev/null +++ b/mithwire/stealth/__init__.py @@ -0,0 +1,80 @@ +"""Engine-owned anti-detect stealth. + +The mithwire engine owns every browser-altering anti-detect capability so that +*any* client (the mithwire-mcp server, or a custom script) gets identical +stealth simply by describing the identity it wants. A client never reimplements +patching; it passes a :class:`FingerprintConfig` and a WebRTC mode to the engine. + +Public surface: + +* :class:`FingerprintConfig` -- declarative identity description. +* :class:`Stealth` -- applies the patches to a live :class:`~mithwire.Browser`. +* :func:`compute_launch_args` -- the stealth-relevant Chromium command-line + flags (``--lang``, ``--force-webrtc-ip-handling-policy``, headless window + size) that must be set at launch, before the process starts. +""" + +from __future__ import annotations + +from .controller import Stealth +from .fingerprint import ( + FingerprintConfig, + accept_language_csv, + languages_for_country, + strip_q_values, +) + +__all__ = [ + "FingerprintConfig", + "Stealth", + "compute_launch_args", + "languages_for_country", + "accept_language_csv", + "strip_q_values", +] + + +def compute_launch_args( + browser_args: list[str], + *, + fingerprint: "FingerprintConfig | None" = None, + headless: bool = False, +) -> list[str]: + """Return the stealth launch flags to append, given the existing args. + + These flags MUST be applied at launch (they cannot be retrofitted via CDP + on an already-spawned process without leaking): + + * ``--force-webrtc-ip-handling-policy`` β€” pinned per proxy presence. A proxy + (detected by the presence of ``--proxy-server=`` in ``browser_args``) + forces ``disable_non_proxied_udp`` so WebRTC can never reveal the real + egress IP behind the proxy; a direct connection uses + ``default_public_interface_only`` (its public IP is the legitimate one, + but private/LAN IPs stay hidden). + * ``--lang`` β€” Chromium applies it itself, so it propagates to + ``navigator.language(s)``, the ``Accept-Language`` header, and Web Workers + consistently. A runtime CDP override cannot rewrite ``navigator.languages`` + in already-spawned workers, so the launch flag is the only leak-free way. + * ``--window-size`` β€” headless Chrome otherwise reports a default-ish screen + that, combined with device-metric overrides, can produce impossible + viewport/screen combinations. + + Existing flags are never duplicated. + """ + existing = list(browser_args or []) + extra: list[str] = [] + + proxied = any(arg.startswith("--proxy-server=") for arg in existing) + if not any("webrtc-ip-handling-policy" in arg for arg in existing): + policy = "disable_non_proxied_udp" if proxied else "default_public_interface_only" + extra.append(f"--force-webrtc-ip-handling-policy={policy}") + + if fingerprint is not None: + lang = fingerprint.primary_language + if lang and not any(arg.startswith("--lang=") for arg in existing): + extra.append(f"--lang={lang}") + + if headless and not any(arg.startswith("--window-size=") for arg in existing): + extra.append("--window-size=1920,1080") + + return extra diff --git a/mithwire/stealth/controller.py b/mithwire/stealth/controller.py new file mode 100644 index 0000000..4047cbc --- /dev/null +++ b/mithwire/stealth/controller.py @@ -0,0 +1,885 @@ +"""Anti-detect stealth controller for the mithwire engine. + +The engine owns every browser-altering anti-detect capability: fingerprint +application, headless user-agent cleanup, WebRTC leak protection, the +new-document stealth shim, and the timezone override. A client (the +``mithwire-mcp`` server, or any custom script) merely *describes* the identity +it wants via :class:`~mithwire.stealth.fingerprint.FingerprintConfig` and a +WebRTC mode; the engine implements all of it. + +Design rule β€” prefer engine-level CDP ``Emulation.*`` overrides over JavaScript +injection. CDP overrides are applied inside Chromium itself, so they propagate +to Web Workers and to HTTP request headers. JS patches injected via +``Page.addScriptToEvaluateOnNewDocument`` only run on the main document, so a +worker reading the unpatched value produces an inconsistency that lie-detectors +(e.g. CreepJS) flag. We therefore use CDP for everything Chromium supports and +fall back to JS only for the handful of properties with no CDP override +(``navigator.deviceMemory`` and, when explicitly requested, the WebGL vendor / +renderer strings). + +This module was extracted verbatim (behaviour-preserving) from the historical +``mithwire_mcp.browser.BridgeBrowser`` so that ownership of the anti-detect +implementation lives with the browser engine, not a single client. +""" + +from __future__ import annotations + +import json +import logging +import re +from typing import Any + +from ..cdp import browser as cdp_browser +from ..cdp import emulation as cdp_emulation +from ..cdp import network as cdp_network +from ..cdp import page as cdp_page +from .fingerprint import FingerprintConfig + +logger = logging.getLogger(__name__) + + +class Stealth: + """Applies anti-detect patches to a live engine :class:`Browser`. + + Constructed against an already-started browser. ``apply_all`` runs the + standard launch-time sequence; individual methods (``apply_fingerprint``, + ``apply_timezone_override``) are also public so a client can re-apply an + identity later (e.g. once a proxy egress geo is resolved). + """ + + def __init__( + self, + browser: Any, + *, + fingerprint: FingerprintConfig | None = None, + webrtc_leak_protection: str = "auto", + headless: bool = False, + proxied: bool = False, + ) -> None: + self.browser = browser + self.fingerprint = fingerprint or FingerprintConfig() + # WebRTC leak protection mode. An HTTP/SOCKS proxy cannot carry STUN/UDP, + # so WebRTC queries STUN over the physical NIC and the server-reflexive + # (srflx) candidate betrays the real public IP -- the #1 proxy leak, and + # one no Chromium flag reliably closes. Modes: "auto" (filter only when + # proxied), "filter" (always), "disable" (remove RTCPeerConnection), "off". + self.webrtc_leak_protection = (webrtc_leak_protection or "auto").strip().lower() + self.headless = headless + self.proxied = proxied + self.timezone_id: str | None = None + self.tab: Any = getattr(browser, "main_tab", None) + self._page_domain_tab: Any | None = None + + async def apply_all(self) -> None: + """Run the launch-time stealth sequence on the active tab.""" + self.tab = getattr(self.browser, "main_tab", None) + await self._inject_stealth_script() + await self._inject_webrtc_protection() + if self.headless: + await self._apply_headless_user_agent() + if not self.fingerprint.is_empty: + await self.apply_fingerprint(self.fingerprint) + + async def _ensure_page_domain(self) -> None: + """Enable the CDP Page domain once on the active tab. + + ``Page.addScriptToEvaluateOnNewDocument`` only actually injects when the + Page domain is enabled on that target's session (mithwire does the same + in ``_prepare_expert``). Without this, registered scripts silently never + run on subsequent documents. + """ + if self.tab is None: + return + if getattr(self, "_page_domain_tab", None) is self.tab: + return + try: + await self.tab.send(cdp_page.enable()) + self._page_domain_tab = self.tab + except Exception as exc: # noqa: BLE001 + logger.debug("Page.enable() failed: %s", exc) + + async def add_script_on_new_document(self, source: str) -> None: + await self._ensure_page_domain() + await self.tab.send(cdp_page.add_script_to_evaluate_on_new_document(source=source)) + + async def _inject_stealth_script(self) -> None: + await self._ensure_page_domain() + # Intentionally do NOT override navigator.webdriver here. Chromium + # already exposes it as a NATIVE getter on Navigator.prototype that + # returns `false` (it only flips to `true` under --enable-automation, + # which this launcher never sets). Re-defining it with + # Object.defineProperty(navigator, 'webdriver', ...) installs a + # non-native getter as an OWN property on the instance, which shadows + # the prototype getter and is itself a detectable tell (e.g. sannysoft + # "WebDriver (New)" flags the tampered descriptor even when the value is + # false). Verified against clean-Chrome and HEAD baselines: leaving the + # native getter untouched passes where the override fails. + # + # The chrome object shim is kept (no-op when window.chrome already + # exists, e.g. headful) to avoid an empty/missing window.chrome in some + # headless contexts. + script = """ + window.chrome = window.chrome || { runtime: {} }; + """ + await self.tab.send(cdp_page.add_script_to_evaluate_on_new_document(source=script)) + + def _resolve_webrtc_action(self) -> str | None: + """Decide the effective WebRTC action for this session ('filter'/'disable'/None).""" + mode = self.webrtc_leak_protection + if mode == "off": + return None + if mode == "disable": + return "disable" + if mode == "filter": + return "filter" + # "auto" (and any unknown value): protect only when proxied, since a + # direct connection's public WebRTC candidate is the legitimate IP. + return "filter" if self.proxied else None + + async def _inject_webrtc_protection(self) -> None: + """Inject the WebRTC leak guard as an all-frames new-document script. + + Runs before page scripts on every navigation/frame. Self-contained: it + bundles its own native-toString mask so the patched accessors/methods + stringify as native even when no fingerprint document JS is injected. + """ + action = self._resolve_webrtc_action() + if action is None: + return + script = self._webrtc_protection_js(action) + try: + await self.tab.send( + cdp_page.add_script_to_evaluate_on_new_document(source=script) + ) + logger.info("Injected WebRTC leak protection (mode=%s).", action) + except Exception as exc: # noqa: BLE001 + logger.warning("Could not inject WebRTC leak protection: %s", exc) + + def _webrtc_protection_js(self, action: str) -> str: + if action == "disable": + # Remove the constructors outright. WebRTC absence is a mild tell but + # cannot leak. Both the standard and webkit-prefixed names are cleared. + return """ + (function () { + const drop = (name) => { + try { Object.defineProperty(window, name, { value: undefined, configurable: true }); } + catch (e) { try { delete window[name]; } catch (e2) {} } + }; + drop('RTCPeerConnection'); + drop('webkitRTCPeerConnection'); + drop('mozRTCPeerConnection'); + drop('RTCDataChannel'); + })(); + """ + # action == "filter": drop public, non-mDNS ICE candidates so the real + # IP never reaches the page. We patch only RTCPeerConnection.prototype + # members that are NORMALLY own properties of that prototype (the + # onicecandidate accessor, the localDescription accessors, and + # createOffer/createAnswer), so no own-property tell is introduced. + # + # We deliberately do NOT use the global Function.prototype.toString mask + # (_NATIVE_MASK_PREAMBLE) here: this guard is ALWAYS-ON (no-spoof path), + # and globally reassigning Function.prototype.toString is itself a strong + # CreepJS tell that cascades into ~9 component "lies" (Timezone, WebGL, + # Canvas, Audio, Math, ...). Instead each replacement gets a light, + # local own-`toString` so `fn.toString()`/`fn + ''` read native, without + # touching the global. (Advanced Function.prototype.toString.call probing + # of these specific WebRTC members is an accepted depth-layer gap -- far + # cheaper than re-leaking the real IP or tripping 9 lies.) + return ( + r""" + (function () { + const RTC = window.RTCPeerConnection || window.webkitRTCPeerConnection; + if (!RTC || !RTC.prototype || RTC.prototype.__nrRtcGuard) return; + const proto = RTC.prototype; + const __nrMask = (fn, name) => { + try { + Object.defineProperty(fn, 'toString', { + value: function toString() { return 'function ' + name + '() { [native code] }'; }, + configurable: true, writable: true, + }); + } catch (e) {} + return fn; + }; + const isPublic = (addr) => { + if (!addr) return false; + addr = ('' + addr).toLowerCase(); + if (addr.indexOf('.local') >= 0 || addr.indexOf('mdns') >= 0) return false; + if (addr.indexOf(':') >= 0) { + return !(addr.indexOf('fe80') === 0 || addr.indexOf('fc') === 0 || addr.indexOf('fd') === 0); + } + if (/^(10\.|127\.|169\.254\.|192\.168\.|172\.(1[6-9]|2\d|3[01])\.)/.test(addr)) return false; + return /^\d{1,3}(\.\d{1,3}){3}$/.test(addr); + }; + const candAddr = (s) => { const p = ('' + s).split(' '); return p[4] || ''; }; + const candBlocked = (cand) => { + try { + const s = cand && (cand.candidate !== undefined ? cand.candidate : cand); + return s ? isPublic(candAddr(s)) : false; + } catch (e) { return false; } + }; + const scrubSdp = (sdp) => { + if (!sdp) return sdp; + return ('' + sdp).split('\r\n').filter((line) => { + const i = line.indexOf('candidate:'); + if (i < 0) return true; + return !isPublic(candAddr(line.slice(i + 'candidate:'.length))); + }).join('\r\n'); + }; + const wrapCb = (cb) => function (ev) { + try { if (ev && ev.candidate && candBlocked(ev.candidate)) return undefined; } catch (e) {} + return cb.apply(this, arguments); + }; + // 1) onicecandidate accessor (own accessor on the prototype): wrap + // the page's handler so srflx/public candidates are dropped. + try { + const od = Object.getOwnPropertyDescriptor(proto, 'onicecandidate'); + if (od && typeof od.set === 'function') { + const getter = function () { return od.get ? od.get.call(this) : null; }; + const setter = function (cb) { + return od.set.call(this, typeof cb === 'function' ? wrapCb(cb) : cb); + }; + __nrMask(getter, 'onicecandidate'); + __nrMask(setter, 'onicecandidate'); + Object.defineProperty(proto, 'onicecandidate', { + configurable: true, enumerable: od.enumerable, get: getter, set: setter, + }); + } + } catch (e) {} + // 2) localDescription family: scrub candidate lines from any SDP a + // page reads back after gathering. + ['localDescription', 'currentLocalDescription', 'pendingLocalDescription'].forEach((prop) => { + try { + const d = Object.getOwnPropertyDescriptor(proto, prop); + if (d && typeof d.get === 'function') { + const getter = function () { + const desc = d.get.call(this); + if (!desc || !desc.sdp) return desc; + try { return new RTCSessionDescription({ type: desc.type, sdp: scrubSdp(desc.sdp) }); } + catch (e) { return desc; } + }; + __nrMask(getter, prop); + Object.defineProperty(proto, prop, { + configurable: true, enumerable: d.enumerable, get: getter, + }); + } + } catch (e) {} + }); + // 3) createOffer/createAnswer (own methods): scrub the promise's SDP + // so non-trickle offers carry no public candidate. + ['createOffer', 'createAnswer'].forEach((m) => { + try { + const orig = proto[m]; + if (typeof orig !== 'function') return; + const wrapped = function () { + const r = orig.apply(this, arguments); + if (r && typeof r.then === 'function') { + return r.then((desc) => { + try { if (desc && desc.sdp) return { type: desc.type, sdp: scrubSdp(desc.sdp) }; } + catch (e) {} + return desc; + }); + } + return r; + }; + __nrMask(wrapped, m); + proto[m] = wrapped; + } catch (e) {} + }); + try { Object.defineProperty(proto, '__nrRtcGuard', { value: true }); } catch (e) {} + })(); + """ + ) + + async def _apply_headless_user_agent(self) -> None: + """Strip ``HeadlessChrome`` while keeping main-thread UA-CH populated. + + Headless Chrome leaks the automation in ``navigator.userAgent`` (it + carries ``HeadlessChrome``), which DAB/sannysoft flag. Stripping it with a + CDP user-agent override is the fix -- but a UA-only override (no + ``userAgentMetadata``) BLANKS ``navigator.userAgentData`` (empty brands + + platform), and an empty brand list is itself a tell since a real Chrome + always exposes one. The earlier code hit exactly that trap whenever the + live high-entropy hints were unreadable (``getHighEntropyValues`` rejects + on ``about:blank`` right after launch), shipping a clean UA with blank + UA-CH. + + So we ALWAYS push the override WITH metadata: ``_build_ua_metadata`` + synthesizes the brand list and infers the host fields from the UA when + the live hints are blank. The UA string itself is only rewritten when the + legacy token is present. + + SCOPE: this covers the MAIN thread only -- the surface virtually all + detectors read. The override does NOT propagate to worker scopes, so a + Worker/ServiceWorker still exposes the raw ``HeadlessChrome`` UA and the + host's real high-entropy hints. Tools that cross-check window-vs-worker + navigator (e.g. CreepJS) therefore still see one inconsistency. Closing + that is a deliberate non-goal here: worker-scope UA spoofing needs CDP + target auto-attach and is a fragile depth layer most sites never probe. + """ + try: + current_ua = await self.tab.evaluate("navigator.userAgent") + except Exception as exc: + logger.warning("Could not read headless user-agent: %s", exc) + return + if not isinstance(current_ua, str) or not current_ua: + return + clean_ua = current_ua.replace("HeadlessChrome", "Chrome") + ua_changed = clean_ua != current_ua + + # Build metadata even when the live hints are unreadable. Right after + # launch ``navigator.userAgentData.getHighEntropyValues`` can reject (UA-CH + # not ready yet) and ``_read_client_hints`` returns None; passing ``{}`` + # lets ``_build_ua_metadata`` synthesize the brand list and infer the host + # fields purely from the UA string. Critically, headless leaves UA-CH + # blank regardless, so we must NEVER fall back to a UA-only override -- + # that BLANKS ``navigator.userAgentData.brands`` (the very tell we fix). + metadata = None + hints = await self._read_client_hints() + try: + metadata = self._build_ua_metadata(hints or {}, ua_string=clean_ua) + except Exception as exc: # noqa: BLE001 + logger.debug("Could not build client-hints metadata: %s", exc) + + if metadata is None: + # Only reachable if metadata synthesis itself failed; at that point a + # UA-only override would blank UA-CH, so apply it solely to strip a + # legacy headless token and otherwise leave UA-CH untouched. + if not ua_changed: + return + try: + await self.tab.send( + cdp_network.set_user_agent_override(user_agent=clean_ua) + ) + except Exception as exc: + logger.warning("Could not override headless user-agent: %s", exc) + return + + try: + await self.tab.send( + cdp_network.set_user_agent_override( + user_agent=clean_ua, user_agent_metadata=metadata + ) + ) + except Exception as exc: + logger.warning("Could not override headless user-agent: %s", exc) + return + + if ua_changed: + try: + await self.add_script_on_new_document( + "Object.defineProperty(navigator, 'userAgent', " + f"{{get: () => {json.dumps(clean_ua)}, configurable: true}});" + ) + except Exception as exc: # noqa: BLE001 + logger.debug("Could not inject UA new-document script: %s", exc) + logger.info( + "Applied headless UA-CH metadata (brands populated; UA %s).", + "rewritten" if ua_changed else "unchanged", + ) + + async def _read_client_hints(self) -> dict[str, Any] | None: + """Read the browser's own high-entropy User-Agent Client Hints.""" + script = """ + (async () => { + const uad = navigator.userAgentData; + if (!uad) return null; + let high = {}; + try { + high = await uad.getHighEntropyValues([ + "platform", "platformVersion", "architecture", + "bitness", "model", "fullVersionList" + ]); + } catch (e) {} + return { + brands: (uad.brands || []).map(b => ({brand: b.brand, version: b.version})), + mobile: !!uad.mobile, + platform: high.platform || uad.platform || "", + platformVersion: high.platformVersion || "", + architecture: high.architecture || "", + bitness: high.bitness || "", + model: high.model || "", + fullVersionList: (high.fullVersionList || []).map(b => ({brand: b.brand, version: b.version})), + }; + })() + """ + try: + data = await self.tab.evaluate(script, await_promise=True, return_by_value=True) + except Exception as exc: # noqa: BLE001 + logger.debug("Could not read client hints: %s", exc) + return None + return data if isinstance(data, dict) else None + + @staticmethod + def _chrome_versions(ua_string: str | None) -> tuple[str, str] | None: + """Extract ``(major, full)`` Chrome version from a UA string, if present.""" + if not ua_string: + return None + match = re.search(r"Chrome/(\d+)(?:\.[\d.]+)?", ua_string) + if not match: + return None + full_match = re.search(r"Chrome/([\d.]+)", ua_string) + full = full_match.group(1) if full_match else f"{match.group(1)}.0.0.0" + return match.group(1), full + + @staticmethod + def _infer_platform_hints(ua_string: str | None) -> tuple[str, str, str, str]: + """Infer ``(platform, platformVersion, architecture, bitness)`` from a UA. + + Used only when the live browser's real high-entropy hints are + unreadable (e.g. a custom UA set at launch while on ``about:blank``). + Getting ``platform`` right is what keeps ``Sec-CH-UA-Platform`` consistent + with ``navigator.userAgent``; the higher-entropy fields are best-effort. + """ + ua = ua_string or "" + if "Windows" in ua: + return ("Windows", "15.0.0", "x86", "64") + if "Macintosh" in ua or "Mac OS X" in ua: + return ("macOS", "15.0.0", "x86" if "Intel" in ua else "arm", "64") + if "Android" in ua: + match = re.search(r"Android (\d+)", ua) + return ("Android", f"{match.group(1)}.0.0" if match else "14.0.0", "arm", "64") + if "CrOS" in ua: + return ("Chrome OS", "", "x86", "64") + if "Linux" in ua or "X11" in ua: + return ("Linux", "", "x86", "64") + return ("", "", "", "") + + def _synthesize_brands(self, major: str, full: str) -> tuple[list[Any], list[Any]]: + """Build a plausible Chromium brand set when real hints are unavailable. + + Reusing the live browser's own hints is always preferred (it carries the + exact, version-correct GREASE brand); this is only a last-resort fallback + so a custom UA never ships with empty ``userAgentData.brands``. + """ + emu = cdp_emulation + grease = 'Not;A=Brand' + brands = [ + emu.UserAgentBrandVersion(brand=grease, version="99"), + emu.UserAgentBrandVersion(brand="Chromium", version=major), + emu.UserAgentBrandVersion(brand="Google Chrome", version=major), + ] + full_list = [ + emu.UserAgentBrandVersion(brand=grease, version="99.0.0.0"), + emu.UserAgentBrandVersion(brand="Chromium", version=full), + emu.UserAgentBrandVersion(brand="Google Chrome", version=full), + ] + return brands, full_list + + def _build_ua_metadata( + self, + hints: dict[str, Any], + *, + platform_override: str | None = None, + ua_string: str | None = None, + ) -> Any: + """Build a CDP ``UserAgentMetadata`` consistent with the active UA. + + ``platform_override`` (e.g. ``"Windows"``) rewrites the UA-CH platform so + ``navigator.userAgentData.platform`` stays consistent with a spoofed + ``navigator.platform``. ``ua_string`` lets us re-version the Chromium / + Google Chrome brands to match a custom user-agent (so the low-entropy + brands and the full-version list agree with ``navigator.userAgent``). + """ + emu = cdp_emulation + versions = self._chrome_versions(ua_string) + + def _is_chromium(brand: str) -> bool: + low = brand.lower() + return "chrom" in low # Chromium + Google Chrome, never the GREASE brand + + def _brand_list(raw: Any, *, full: bool) -> list[Any]: + out: list[Any] = [] + for item in raw or []: + brand = str(item.get("brand", "") or "") + if not brand: + continue + brand = brand.replace("HeadlessChrome", "Google Chrome") + version = str(item.get("version", "") or "") + if versions and _is_chromium(brand): + version = versions[1] if full else versions[0] + out.append(emu.UserAgentBrandVersion(brand=brand, version=version)) + return out + + brands = _brand_list(hints.get("brands"), full=False) + full_version_list = _brand_list(hints.get("fullVersionList"), full=True) + # Fall back to a synthesized set so a custom UA never blanks UA-CH. + if not brands and versions: + brands, full_version_list = self._synthesize_brands(versions[0], versions[1]) + # Infer host fields when the live hints are unavailable (about:blank). + inferred = ( + self._infer_platform_hints(ua_string) if not hints.get("platform") else None + ) + + def _field(key: str, idx: int) -> str: + real = str(hints.get(key, "") or "") + if real: + return real + return inferred[idx] if inferred else "" + + platform_value = platform_override or _field("platform", 0) + return emu.UserAgentMetadata( + platform=platform_value, + platform_version=_field("platformVersion", 1), + architecture=_field("architecture", 2), + model=str(hints.get("model", "") or ""), + mobile=bool(hints.get("mobile", False)), + brands=brands or None, + full_version_list=full_version_list or None, + bitness=_field("bitness", 3), + ) + + async def apply_fingerprint(self, fp: FingerprintConfig) -> dict[str, Any]: + """Apply an identity to the live session, engine-level where possible. + + Order and mechanism are chosen for *consistency*: CDP ``Emulation.*`` + overrides (timezone, locale, UA/Accept-Language/platform, geolocation, + hardware concurrency, device metrics, touch) are applied inside Chromium + so they reach Web Workers and HTTP headers. Only ``deviceMemory`` and the + optional WebGL strings β€” which have no CDP override β€” are injected as + new-document JS (and eval'd once on the current document for immediate + effect). + """ + if self.tab is None: + raise RuntimeError("Browser not started") + emu = cdp_emulation + applied: dict[str, Any] = {} + + if fp.timezone_id: + try: + await self.tab.send(emu.set_timezone_override(timezone_id=fp.timezone_id)) + self.timezone_id = fp.timezone_id + applied["timezone_id"] = fp.timezone_id + except Exception as exc: # noqa: BLE001 + logger.warning("setTimezoneOverride(%s) failed: %s", fp.timezone_id, exc) + + if fp.locale: + try: + await self.tab.send(emu.set_locale_override(locale=fp.locale)) + applied["locale"] = fp.locale + except Exception as exc: # noqa: BLE001 + logger.warning("setLocaleOverride(%s) failed: %s", fp.locale, exc) + + # User-Agent / Accept-Language / platform share one CDP call. We only + # issue it when at least one of those is requested, and we always pass a + # user_agent (the current one if unchanged) because the param is required. + accept_language = fp.effective_accept_language + if fp.user_agent or fp.platform or accept_language: + try: + current_ua = await self.tab.evaluate("navigator.userAgent") + except Exception: # noqa: BLE001 + current_ua = None + ua_string = fp.user_agent or (current_ua if isinstance(current_ua, str) else None) + if ua_string: + metadata = None + if fp.user_agent or fp.platform: + hints = await self._read_client_hints() + try: + # Build even when live hints are empty: a custom UA + # falls back to a synthesized brand set so UA-CH is never + # blanked (which is itself a strong bot signal). + metadata = self._build_ua_metadata( + hints or {}, + platform_override=fp.platform, + ua_string=fp.user_agent, + ) + except Exception as exc: # noqa: BLE001 + logger.debug("UA metadata build failed: %s", exc) + kwargs: dict[str, Any] = {"user_agent": ua_string} + if accept_language: + kwargs["accept_language"] = accept_language + if fp.platform: + kwargs["platform"] = fp.platform + if metadata is not None: + kwargs["user_agent_metadata"] = metadata + try: + await self.tab.send(emu.set_user_agent_override(**kwargs)) + if accept_language: + applied["accept_language"] = accept_language + if fp.user_agent: + applied["user_agent"] = ua_string + if fp.platform: + applied["platform"] = fp.platform + except Exception as exc: # noqa: BLE001 + logger.warning("setUserAgentOverride failed: %s", exc) + + if fp.latitude is not None and fp.longitude is not None: + try: + await self.tab.send( + emu.set_geolocation_override( + latitude=fp.latitude, + longitude=fp.longitude, + accuracy=fp.geo_accuracy if fp.geo_accuracy is not None else 50.0, + ) + ) + # The override only supplies coordinates; the page still needs + # the geolocation permission or getCurrentPosition() times out. + # Granting it browser-wide mirrors a user who allowed location. + await self._grant_geolocation_permission() + applied["geolocation"] = {"latitude": fp.latitude, "longitude": fp.longitude} + except Exception as exc: # noqa: BLE001 + logger.warning("setGeolocationOverride failed: %s", exc) + + if fp.hardware_concurrency is not None: + try: + await self.tab.send( + emu.set_hardware_concurrency_override( + hardware_concurrency=int(fp.hardware_concurrency) + ) + ) + applied["hardware_concurrency"] = int(fp.hardware_concurrency) + except Exception as exc: # noqa: BLE001 + logger.warning("setHardwareConcurrencyOverride failed: %s", exc) + + if fp.has_device_metrics: + try: + await self.tab.send( + emu.set_device_metrics_override( + width=int(fp.screen_width), + height=int(fp.screen_height), + device_scale_factor=float(fp.device_scale_factor or 1.0), + mobile=bool(fp.mobile), + # Without screen_width/height, only the viewport + # (innerWidth/innerHeight) changes while screen.width/ + # height keep the host values -> innerWidth can exceed + # screen.width, an impossible, easily-flagged state. + screen_width=int(fp.screen_width), + screen_height=int(fp.screen_height), + ) + ) + applied["device_metrics"] = { + "width": int(fp.screen_width), + "height": int(fp.screen_height), + "device_scale_factor": float(fp.device_scale_factor or 1.0), + "mobile": bool(fp.mobile), + } + except Exception as exc: # noqa: BLE001 + logger.warning("setDeviceMetricsOverride failed: %s", exc) + + if fp.max_touch_points is not None: + try: + await self.tab.send( + emu.set_touch_emulation_enabled( + enabled=int(fp.max_touch_points) > 0, + max_touch_points=int(fp.max_touch_points) or 1, + ) + ) + applied["max_touch_points"] = int(fp.max_touch_points) + except Exception as exc: # noqa: BLE001 + logger.warning("setTouchEmulationEnabled failed: %s", exc) + + # JS-only overrides (no CDP equivalent): deviceMemory and WebGL strings. + document_js = self._fingerprint_document_js(fp) + if document_js: + try: + await self.add_script_on_new_document(document_js) + except Exception as exc: # noqa: BLE001 + logger.debug("fingerprint new-document script failed: %s", exc) + try: + await self.tab.evaluate(document_js) + except Exception as exc: # noqa: BLE001 + logger.debug("fingerprint immediate eval failed: %s", exc) + if fp.device_memory is not None: + applied["device_memory"] = fp.device_memory + if fp.webgl_vendor or fp.webgl_renderer: + applied["webgl"] = { + "vendor": fp.webgl_vendor, + "renderer": fp.webgl_renderer, + } + + self.fingerprint = self.fingerprint.merged_with(fp) + logger.info("Applied fingerprint overrides: %s", sorted(applied)) + return applied + + async def _grant_geolocation_permission(self) -> None: + """Grant geolocation permission for the active tab's browser context. + + Sent over the browser-level connection (Browser-domain command) and + scoped to the tab's ``browserContextId`` so the grant actually applies + to the context the page lives in β€” otherwise the page keeps prompting. + """ + if self.browser is None: + return + connection = getattr(self.browser, "connection", None) + if connection is None: + return + context_id = None + target = getattr(self.tab, "target", None) + if target is not None: + context_id = getattr(target, "browser_context_id", None) + try: + await connection.send( + cdp_browser.grant_permissions( + permissions=[cdp_browser.PermissionType.GEOLOCATION], + browser_context_id=context_id, + ) + ) + except Exception as exc: # noqa: BLE001 + logger.debug("grantPermissions(geolocation) failed: %s", exc) + + def _worker_bootstrap_js(self, fp: FingerprintConfig) -> str: + """JS run *inside* each worker to re-assert JS-only navigator props. + + CDP timezone/locale/hardwareConcurrency overrides already reach workers, + but navigator.language(s) (ignored by --lang on macOS) and + navigator.deviceMemory (no CDP override at all) do not, so a worker would + otherwise read host values and trip a main-vs-worker mismatch. + """ + lines: list[str] = [] + if fp.languages: + lines.append( + "Object.defineProperty(p,'languages',{get:function(){return %s;},configurable:true});" + % json.dumps(fp.languages) + ) + lines.append( + "Object.defineProperty(p,'language',{get:function(){return %s;},configurable:true});" + % json.dumps(fp.primary_language or fp.languages[0]) + ) + if fp.device_memory is not None: + lines.append( + "Object.defineProperty(p,'deviceMemory',{get:function(){return %s;},configurable:true});" + % json.dumps(fp.device_memory) + ) + # hardwareConcurrency: CDP setHardwareConcurrencyOverride covers the main + # thread but NOT workers, so re-assert it here for worker consistency. + if fp.hardware_concurrency is not None: + lines.append( + "Object.defineProperty(p,'hardwareConcurrency',{get:function(){return %s;},configurable:true});" + % json.dumps(int(fp.hardware_concurrency)) + ) + nav_block = ( + "try{var p=Object.getPrototypeOf(navigator);" + "".join(lines) + "}catch(e){}" + if lines + else "" + ) + # OffscreenCanvas WebGL lives in the worker too; without the same + # getParameter patch a worker reports the real GPU while the main thread + # reports the spoofed one -> CreepJS flags the mismatch. The WebGL patch + # depends on __nrMask, so pull in the native-toString shim here as well. + webgl_block = self._webgl_patch_js(fp) + parts: list[str] = [] + if webgl_block: + parts.append(self._NATIVE_MASK_PREAMBLE) + if nav_block: + parts.append(nav_block) + if webgl_block: + parts.append(webgl_block) + return "".join(parts) + + def _webgl_patch_js(self, fp: FingerprintConfig) -> str: + """getParameter override for UNMASKED vendor/renderer (assumes __nrMask). + + Uses ``self.*`` so the same source works in a document and in a worker + (OffscreenCanvas) global scope. + """ + if not (fp.webgl_vendor or fp.webgl_renderer): + return "" + vendor = json.dumps(fp.webgl_vendor or "") + renderer = json.dumps(fp.webgl_renderer or "") + return ( + """ + try { + const V = %s, R = %s; + const patch = (proto) => { + if (!proto || !proto.getParameter) return; + const orig = proto.getParameter; + const wrapped = function getParameter(p) { + if (V && p === 37445) return V; // UNMASKED_VENDOR_WEBGL + if (R && p === 37446) return R; // UNMASKED_RENDERER_WEBGL + return orig.apply(this, arguments); + }; + __nrMask(wrapped, "getParameter"); + proto.getParameter = wrapped; + }; + patch(self.WebGLRenderingContext && self.WebGLRenderingContext.prototype); + patch(self.WebGL2RenderingContext && self.WebGL2RenderingContext.prototype); + } catch (e) {} + """ + % (vendor, renderer) + ) + + # Shared preamble: makes any function we patch report + # `function () { [native code] }` via a LOCAL own-`toString` per fn. + # + # History: this used to install a GLOBAL `Function.prototype.toString` shim + # (backed by a WeakMap) so even `Function.prototype.toString.call(fn)` read + # native. That defeats the strongest probe, BUT globally reassigning + # `Function.prototype.toString` is ITSELF a strong CreepJS tell -- it + # cascaded into ~9 component "lies" (Timezone/WebGL/Canvas/Audio/Math/...), + # taking a spoofed session from 1 lie to 10 (measured). A local own-toString + # leaves the global pristine: `fn.toString()` / `fn + ''` read native (the + # common checks) while only the rarer `Function.prototype.toString.call(fn)` + # of a specific patched member can still reveal it -- an accepted depth-layer + # gap, far cheaper than tripping 9 lies. Call sites are unchanged + # (`__nrMask(fn, name)`). + _NATIVE_MASK_PREAMBLE = """ + const __nrMask = (fn, name) => { + try { + const ts = function toString() { return "function " + name + "() { [native code] }"; }; + Object.defineProperty(fn, "toString", { value: ts, configurable: true, writable: true }); + } catch (e) {} + return fn; + }; + """ + + def _fingerprint_document_js(self, fp: FingerprintConfig) -> str | None: + """Build the JS for properties Chromium has no CDP override for.""" + blocks: list[str] = [] + worker_boot = self._worker_bootstrap_js(fp) + wants_webgl = bool(fp.webgl_vendor or fp.webgl_renderer) + # The native-toString mask must be defined before any patched function. + if worker_boot or wants_webgl: + blocks.append(self._NATIVE_MASK_PREAMBLE) + # Wrap the classic Worker constructor so every worker first re-asserts + # the JS-only navigator props (language(s), deviceMemory) before running + # its real script (loaded transparently via importScripts). + if worker_boot: + blocks.append( + """ + try { + const BOOT = %s; + const NativeWorker = self.Worker; + if (NativeWorker && !NativeWorker.__nrPatched) { + const Wrapped = function Worker(url, options) { + try { + if (!options || options.type !== 'module') { + const abs = new URL(url, self.location.href).href; + const src = BOOT + ";importScripts(" + JSON.stringify(abs) + ");"; + const burl = URL.createObjectURL( + new Blob([src], { type: 'text/javascript' }) + ); + return new NativeWorker(burl, options); + } + } catch (e) {} + return new NativeWorker(url, options); + }; + Wrapped.prototype = NativeWorker.prototype; + Wrapped.__nrPatched = true; + __nrMask(Wrapped, "Worker"); + self.Worker = Wrapped; + } + } catch (e) {} + """ + % json.dumps(worker_boot) + ) + if fp.device_memory is not None: + blocks.append( + "try{Object.defineProperty(navigator,'deviceMemory'," + f"{{get:()=>{json.dumps(fp.device_memory)},configurable:true}});}}catch(e){{}}" + ) + if wants_webgl: + blocks.append(self._webgl_patch_js(fp)) + if not blocks: + return None + return "(()=>{" + "".join(blocks) + "})();" + + async def apply_timezone_override(self, timezone_id: str) -> None: + """Pin the JS timezone via CDP ``Emulation.setTimezoneOverride``.""" + if not timezone_id or self.tab is None: + return + try: + await self.tab.send(cdp_emulation.set_timezone_override(timezone_id=timezone_id)) + self.timezone_id = timezone_id + logger.info("Applied timezone override: %s", timezone_id) + except Exception as exc: # noqa: BLE001 + logger.warning("Could not set timezone override (%s): %s", timezone_id, exc) diff --git a/mithwire/stealth/fingerprint.py b/mithwire/stealth/fingerprint.py new file mode 100644 index 0000000..32fbe8b --- /dev/null +++ b/mithwire/stealth/fingerprint.py @@ -0,0 +1,341 @@ +"""Browser identity / anti-detect fingerprint configuration. + +A :class:`FingerprintConfig` is a declarative description of the identity a +session should present: timezone, locale, language, geolocation, device +metrics, hardware hints, user agent, and (optionally) GPU strings. + +Design rule β€” prefer engine-level CDP ``Emulation.*`` overrides over JavaScript +injection. CDP overrides are applied inside Chromium itself, so they propagate +to *Web Workers* and to HTTP request headers. JS patches injected via +``Page.addScriptToEvaluateOnNewDocument`` only run on the main document, so a +worker reading the unpatched value produces an inconsistency that lie-detectors +(e.g. CreepJS) flag. We therefore use CDP for everything Chromium supports and +fall back to JS only for the handful of properties with no CDP override +(``navigator.deviceMemory`` and, when explicitly requested, the WebGL vendor / +renderer strings). + +The whole point of this module is *internal consistency*: every signal a site +can read should agree with every other signal and with the proxy egress IP. A +mismatched override is worse than no override at all. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +# -------------------------------------------------------------------------- +# country -> language mapping +# -------------------------------------------------------------------------- +# ipapi.is returns a country_code but no language, so we derive a plausible +# Accept-Language / navigator.languages set from the country. Values are the +# locally dominant locale(s); English is appended as a near-universal fallback +# the way real browsers in these regions are commonly configured. +_COUNTRY_LANGUAGES: dict[str, list[str]] = { + "US": ["en-US", "en"], + "GB": ["en-GB", "en"], + "CA": ["en-CA", "fr-CA", "en"], + "AU": ["en-AU", "en"], + "IE": ["en-IE", "en"], + "NZ": ["en-NZ", "en"], + "DE": ["de-DE", "de", "en"], + "AT": ["de-AT", "de", "en"], + "CH": ["de-CH", "de", "fr", "en"], + "FR": ["fr-FR", "fr", "en"], + "BE": ["nl-BE", "fr-BE", "en"], + "NL": ["nl-NL", "nl", "en"], + "ES": ["es-ES", "es", "en"], + "MX": ["es-MX", "es", "en"], + "AR": ["es-AR", "es", "en"], + "CO": ["es-CO", "es", "en"], + "CL": ["es-CL", "es", "en"], + "IT": ["it-IT", "it", "en"], + "PT": ["pt-PT", "pt", "en"], + "BR": ["pt-BR", "pt", "en"], + "PL": ["pl-PL", "pl", "en"], + "RU": ["ru-RU", "ru", "en"], + "UA": ["uk-UA", "uk", "ru", "en"], + "SE": ["sv-SE", "sv", "en"], + "NO": ["nb-NO", "no", "en"], + "DK": ["da-DK", "da", "en"], + "FI": ["fi-FI", "fi", "sv", "en"], + "CZ": ["cs-CZ", "cs", "en"], + "GR": ["el-GR", "el", "en"], + "TR": ["tr-TR", "tr", "en"], + "RO": ["ro-RO", "ro", "en"], + "HU": ["hu-HU", "hu", "en"], + "JP": ["ja-JP", "ja", "en"], + "KR": ["ko-KR", "ko", "en"], + "CN": ["zh-CN", "zh", "en"], + "TW": ["zh-TW", "zh", "en"], + "HK": ["zh-HK", "zh", "en"], + "IN": ["en-IN", "hi-IN", "hi", "en"], + "ID": ["id-ID", "id", "en"], + "TH": ["th-TH", "th", "en"], + "VN": ["vi-VN", "vi", "en"], + "PH": ["en-PH", "fil-PH", "en"], + "MY": ["ms-MY", "en-MY", "en"], + "SG": ["en-SG", "zh-SG", "en"], + "ZA": ["en-ZA", "af-ZA", "en"], + "AE": ["ar-AE", "ar", "en"], + "SA": ["ar-SA", "ar", "en"], + "IL": ["he-IL", "he", "en"], +} + +_DEFAULT_LANGUAGES = ["en-US", "en"] + + +def languages_for_country(country_code: str | None) -> list[str]: + if not country_code: + return list(_DEFAULT_LANGUAGES) + return list(_COUNTRY_LANGUAGES.get(country_code.strip().upper(), _DEFAULT_LANGUAGES)) + + +def strip_q_values(value: str) -> str: + """Drop ``;q=...`` weights from an Accept-Language-like string. + + Chromium's CDP ``acceptLanguage`` override expects a *plain* comma list and + re-derives the q-weights itself when it builds the header. If we pass a + pre-weighted string Chromium doubles the weights (``de;q=0.9;q=0.9``) and + leaks the literal ``;q=`` tokens into ``navigator.languages`` β€” a glaring + inconsistency. So we always normalise to a clean ``"de-DE,de,en"`` form. + """ + parts: list[str] = [] + for token in value.split(","): + lang = token.split(";")[0].strip() + if lang: + parts.append(lang) + return ",".join(parts) + + +def accept_language_csv(languages: list[str]) -> str: + """Clean comma list for CDP ``acceptLanguage`` (Chromium adds q-weights).""" + if not languages: + languages = list(_DEFAULT_LANGUAGES) + return ",".join(languages) + + +def _locale_from_languages(languages: list[str]) -> str | None: + return languages[0] if languages else None + + +@dataclass +class FingerprintConfig: + """Declarative identity description. All fields optional; unset = untouched.""" + + timezone_id: str | None = None + locale: str | None = None # BCP-47, e.g. "de-DE" + languages: list[str] | None = None # navigator.languages + accept_language: str | None = None # explicit header override + + latitude: float | None = None + longitude: float | None = None + geo_accuracy: float | None = None + + user_agent: str | None = None + platform: str | None = None # navigator.platform + hardware_concurrency: int | None = None + device_memory: int | None = None # GB: 0.25/0.5/1/2/4/8 + + screen_width: int | None = None + screen_height: int | None = None + device_scale_factor: float | None = None + mobile: bool | None = None + max_touch_points: int | None = None + + webgl_vendor: str | None = None + webgl_renderer: str | None = None + + # Free-form provenance (e.g. proxy egress country/city) for diagnostics. + source: dict[str, Any] = field(default_factory=dict) + + @property + def has_device_metrics(self) -> bool: + return ( + self.screen_width is not None + and self.screen_height is not None + ) + + @property + def is_empty(self) -> bool: + return not any( + value is not None + for value in ( + self.timezone_id, + self.locale, + self.languages, + self.accept_language, + self.latitude, + self.longitude, + self.user_agent, + self.platform, + self.hardware_concurrency, + self.device_memory, + self.screen_width, + self.screen_height, + self.device_scale_factor, + self.mobile, + self.max_touch_points, + self.webgl_vendor, + self.webgl_renderer, + ) + ) + + @property + def primary_language(self) -> str | None: + if self.languages: + return self.languages[0] + if self.locale: + return self.locale + return None + + @property + def effective_accept_language(self) -> str | None: + """Clean comma list to feed CDP ``acceptLanguage`` (no q-weights).""" + if self.accept_language: + return strip_q_values(self.accept_language) + if self.languages: + return accept_language_csv(self.languages) + return None + + def to_metadata(self) -> dict[str, Any]: + """Compact, JSON-safe view for session metadata / diagnostics.""" + data = { + "timezone_id": self.timezone_id, + "locale": self.locale, + "languages": self.languages, + "accept_language": self.effective_accept_language, + "latitude": self.latitude, + "longitude": self.longitude, + "user_agent": self.user_agent, + "platform": self.platform, + "hardware_concurrency": self.hardware_concurrency, + "device_memory": self.device_memory, + "screen": ( + { + "width": self.screen_width, + "height": self.screen_height, + "device_scale_factor": self.device_scale_factor, + "mobile": self.mobile, + "max_touch_points": self.max_touch_points, + } + if self.has_device_metrics or self.max_touch_points is not None + else None + ), + "webgl_vendor": self.webgl_vendor, + "webgl_renderer": self.webgl_renderer, + "source": self.source or None, + } + return {key: value for key, value in data.items() if value is not None} + + @classmethod + def from_dict(cls, raw: Any) -> "FingerprintConfig": + if raw is None: + return cls() + if isinstance(raw, FingerprintConfig): + return raw + if not isinstance(raw, dict): + raise ValueError("fingerprint must be an object/dict.") + + def _f(*keys: str) -> Any: + for key in keys: + if key in raw and raw[key] is not None: + return raw[key] + return None + + languages = _f("languages") + if isinstance(languages, str): + languages = [part.strip() for part in languages.split(",") if part.strip()] + elif isinstance(languages, list): + languages = [str(part).strip() for part in languages if str(part).strip()] + elif languages is not None: + raise ValueError("fingerprint.languages must be a list or comma string.") + + locale = _f("locale") + if locale is None and languages: + locale = _locale_from_languages(languages) + + def _num(value: Any) -> float | None: + if value is None: + return None + return float(value) + + def _int(value: Any) -> int | None: + if value is None: + return None + return int(value) + + screen = _f("screen") or {} + if not isinstance(screen, dict): + screen = {} + + return cls( + timezone_id=_f("timezone_id", "timezone", "tz"), + locale=locale, + languages=languages, + accept_language=_f("accept_language"), + latitude=_num(_f("latitude", "lat")), + longitude=_num(_f("longitude", "lon", "lng")), + geo_accuracy=_num(_f("geo_accuracy", "accuracy")), + user_agent=_f("user_agent", "ua"), + platform=_f("platform"), + hardware_concurrency=_int(_f("hardware_concurrency", "cores")), + device_memory=_num(_f("device_memory", "ram")), + screen_width=_int(_f("screen_width") or screen.get("width")), + screen_height=_int(_f("screen_height") or screen.get("height")), + device_scale_factor=_num( + _f("device_scale_factor", "dpr") or screen.get("device_scale_factor") + ), + mobile=_f("mobile") if _f("mobile") is not None else screen.get("mobile"), + max_touch_points=_int(_f("max_touch_points") or screen.get("max_touch_points")), + webgl_vendor=_f("webgl_vendor"), + webgl_renderer=_f("webgl_renderer"), + source=_f("source") or {}, + ) + + @classmethod + def from_ipapi(cls, data: dict[str, Any]) -> "FingerprintConfig": + """Build an identity from an ``api.ipapi.is`` response. + + Aligns timezone, locale/language, and geolocation to the (proxy) egress + IP so the presented identity is internally consistent with the network + path the traffic actually takes. + """ + location = data.get("location") or {} + country_code = location.get("country_code") + languages = languages_for_country(country_code) + config = cls( + timezone_id=location.get("timezone"), + locale=_locale_from_languages(languages), + languages=languages, + accept_language=accept_language_csv(languages), + latitude=location.get("latitude"), + longitude=location.get("longitude"), + geo_accuracy=50.0, + source={ + "exit_ip": data.get("ip"), + "country": location.get("country"), + "country_code": country_code, + "city": location.get("city"), + "timezone": location.get("timezone"), + }, + ) + return config + + def merged_with(self, override: "FingerprintConfig") -> "FingerprintConfig": + """Return a copy where any set field of ``override`` wins.""" + base = self + out = FingerprintConfig() + for f_name in ( + "timezone_id", "locale", "languages", "accept_language", + "latitude", "longitude", "geo_accuracy", "user_agent", "platform", + "hardware_concurrency", "device_memory", "screen_width", + "screen_height", "device_scale_factor", "mobile", "max_touch_points", + "webgl_vendor", "webgl_renderer", + ): + ov = getattr(override, f_name) + setattr(out, f_name, ov if ov is not None else getattr(base, f_name)) + merged_source = dict(base.source or {}) + merged_source.update(override.source or {}) + out.source = merged_source + return out diff --git a/tests/test_connect_retry.py b/tests/test_connect_retry.py index 4544b64..5dd3ef2 100644 --- a/tests/test_connect_retry.py +++ b/tests/test_connect_retry.py @@ -75,10 +75,15 @@ def _patches(): "asyncio.create_subprocess_exec", AsyncMock(return_value=_FakeProc()), ), - # `attach()` opens a CDP websocket; `update_targets()` issues CDP calls. - # Neither is the unit under test; stub both to no-ops. + # `attach()` opens a CDP websocket; `update_targets()` issues CDP calls; + # `_apply_stealth()` drives CDP overrides + new-document scripts against + # the live tab. None of the three is the unit under test (the connect + # loop), and all require a real browser/websocket, so stub them to + # no-ops -- otherwise they raise on the mocked CDP and `_apply_stealth` + # would also charge its real ~1.2s settle sleep into the timing asserts. patch.object(browser_mod.Browser, "attach", AsyncMock()), patch.object(browser_mod.Browser, "update_targets", AsyncMock()), + patch.object(browser_mod.Browser, "_apply_stealth", AsyncMock()), ) @@ -91,8 +96,8 @@ async def test_succeeds_after_slow_port_bind(self) -> None: stub = _http_api_failing_n_times(5) config = uc.Config(headless=True, sandbox=True) - sub_patch, attach_patch, update_patch = _patches() - with sub_patch, attach_patch, update_patch, \ + sub_patch, attach_patch, update_patch, stealth_patch = _patches() + with sub_patch, attach_patch, update_patch, stealth_patch, \ patch.object(browser_mod, "HTTPApi", stub): t0 = time.monotonic() browser = await browser_mod.Browser.create(config) @@ -116,8 +121,8 @@ async def test_persistent_failure_raises_within_budget(self) -> None: # so within a bounded (~10-12s) wall-clock budget. config = uc.Config(headless=True, sandbox=True) - sub_patch, attach_patch, update_patch = _patches() - with sub_patch, attach_patch, update_patch, \ + sub_patch, attach_patch, update_patch, stealth_patch = _patches() + with sub_patch, attach_patch, update_patch, stealth_patch, \ patch.object(browser_mod, "HTTPApi", _AlwaysFailHTTPApi): t0 = time.monotonic() with self.assertRaises(Exception) as ctx: @@ -137,8 +142,8 @@ async def test_immediate_success_is_fast(self) -> None: stub = _http_api_failing_n_times(0) config = uc.Config(headless=True, sandbox=True) - sub_patch, attach_patch, update_patch = _patches() - with sub_patch, attach_patch, update_patch, \ + sub_patch, attach_patch, update_patch, stealth_patch = _patches() + with sub_patch, attach_patch, update_patch, stealth_patch, \ patch.object(browser_mod, "HTTPApi", stub): t0 = time.monotonic() browser = await browser_mod.Browser.create(config)