|
| 1 | +""" |
| 2 | +Canonical binary serializer for KeepKey EVM signed metadata. |
| 3 | +
|
| 4 | +Produces the exact binary format that firmware's parse_metadata_binary() expects. |
| 5 | +Used for generating test vectors and by the Pioneer signing service. |
| 6 | +
|
| 7 | +Binary format: |
| 8 | + version(1) + chain_id(4 BE) + contract_address(20) + selector(4) + |
| 9 | + tx_hash(32) + method_name_len(2 BE) + method_name(var) + num_args(1) + |
| 10 | + [per arg: name_len(1) + name(var) + format(1) + value_len(2 BE) + value(var)] + |
| 11 | + classification(1) + timestamp(4 BE) + key_id(1) + signature(64) + recovery(1) |
| 12 | +""" |
| 13 | + |
| 14 | +import struct |
| 15 | +import hashlib |
| 16 | +import time |
| 17 | + |
| 18 | +# Keep in sync with firmware signed_metadata.h |
| 19 | +ARG_FORMAT_RAW = 0 |
| 20 | +ARG_FORMAT_ADDRESS = 1 |
| 21 | +ARG_FORMAT_AMOUNT = 2 |
| 22 | +ARG_FORMAT_BYTES = 3 |
| 23 | + |
| 24 | +CLASSIFICATION_OPAQUE = 0 |
| 25 | +CLASSIFICATION_VERIFIED = 1 |
| 26 | +CLASSIFICATION_MALFORMED = 2 |
| 27 | + |
| 28 | +# ── Test key derivation (BIP-39 + SignIdentity path) ────────────────── |
| 29 | +# Uses KeepKey's standard SignIdentity operation for key derivation. |
| 30 | +# Any KeepKey loaded with the same mnemonic derives the same key. |
| 31 | +# |
| 32 | +# Identity fields (what SignIdentity receives): |
| 33 | +# proto: "ssh" — selects raw SHA256 signing (no prefix wrapping) |
| 34 | +# host: "keepkey.com" — the domain |
| 35 | +# path: "/insight" — the purpose |
| 36 | +# index: 0-3 — key slot |
| 37 | +# |
| 38 | +# The proto="ssh" is an internal detail that selects the firmware's |
| 39 | +# sshMessageSign() code path (SHA256 + secp256k1, no prefix). |
| 40 | +# Users interact with host + path only. |
| 41 | + |
| 42 | +# Test mnemonic — loaded from INSIGHT_MNEMONIC env var, or falls back to |
| 43 | +# the standard BIP-39 test vector. CI uses the test vector; production |
| 44 | +# signing uses the env var which is never committed to source. |
| 45 | +import os as _os |
| 46 | +TEST_MNEMONIC = _os.environ.get('INSIGHT_MNEMONIC', |
| 47 | + 'abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about') |
| 48 | + |
| 49 | +# Identity fields — must match pioneer-insight keygen exactly |
| 50 | +INSIGHT_IDENTITY = { |
| 51 | + 'proto': 'ssh', |
| 52 | + 'host': 'keepkey.com', |
| 53 | + 'path': '/insight', |
| 54 | +} |
| 55 | + |
| 56 | +def _identity_fingerprint(identity, index): |
| 57 | + """Match firmware's cryptoIdentityFingerprint() exactly. |
| 58 | +
|
| 59 | + Firmware order: index(4 LE) + proto + "://" + host + path |
| 60 | + """ |
| 61 | + import struct as _s |
| 62 | + ctx = hashlib.sha256() |
| 63 | + ctx.update(_s.pack('<I', index)) |
| 64 | + if identity.get('proto'): |
| 65 | + ctx.update(identity['proto'].encode()) |
| 66 | + ctx.update(b'://') |
| 67 | + if identity.get('user'): |
| 68 | + ctx.update(identity['user'].encode()) |
| 69 | + ctx.update(b'@') |
| 70 | + if identity.get('host'): |
| 71 | + ctx.update(identity['host'].encode()) |
| 72 | + if identity.get('port'): |
| 73 | + ctx.update(b':') |
| 74 | + ctx.update(identity['port'].encode()) |
| 75 | + if identity.get('path'): |
| 76 | + ctx.update(identity['path'].encode()) |
| 77 | + return ctx.digest() |
| 78 | + |
| 79 | +def _derive_hardened(parent_key, parent_chain, index): |
| 80 | + """BIP-32 hardened child derivation.""" |
| 81 | + import hmac as _hmac |
| 82 | + data = b'\x00' + parent_key + struct.pack('>I', index) |
| 83 | + I = _hmac.new(parent_chain, data, 'sha512').digest() |
| 84 | + il = int.from_bytes(I[:32], 'big') |
| 85 | + pk = int.from_bytes(parent_key, 'big') |
| 86 | + n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 |
| 87 | + child = (pk + il) % n |
| 88 | + return child.to_bytes(32, 'big'), I[32:] |
| 89 | + |
| 90 | +def _mnemonic_to_seed(mnemonic, passphrase=''): |
| 91 | + import hmac as _hmac |
| 92 | + pw = mnemonic.encode('utf-8') |
| 93 | + salt = ('mnemonic' + passphrase).encode('utf-8') |
| 94 | + return hashlib.pbkdf2_hmac('sha512', pw, salt, 2048, dklen=64) |
| 95 | + |
| 96 | +def _derive_insight_key(mnemonic, slot=0): |
| 97 | + """Derive the signing key matching KeepKey's SignIdentity for insight.""" |
| 98 | + import hmac as _hmac |
| 99 | + seed = _mnemonic_to_seed(mnemonic) |
| 100 | + I = _hmac.new(b'Bitcoin seed', seed, 'sha512').digest() |
| 101 | + key, chain = I[:32], I[32:] |
| 102 | + |
| 103 | + # Path: m/13'/hash[0..3]'/hash[4..7]'/hash[8..11]'/hash[12..15]' |
| 104 | + fp = _identity_fingerprint(INSIGHT_IDENTITY, slot) |
| 105 | + path = [ |
| 106 | + 0x80000000 | 13, |
| 107 | + 0x80000000 | int.from_bytes(fp[0:4], 'little'), |
| 108 | + 0x80000000 | int.from_bytes(fp[4:8], 'little'), |
| 109 | + 0x80000000 | int.from_bytes(fp[8:12], 'little'), |
| 110 | + 0x80000000 | int.from_bytes(fp[12:16], 'little'), |
| 111 | + ] |
| 112 | + |
| 113 | + for idx in path: |
| 114 | + key, chain = _derive_hardened(key, chain, idx) |
| 115 | + |
| 116 | + return key |
| 117 | + |
| 118 | +# Derive the test private key from the standard test mnemonic |
| 119 | +TEST_PRIVATE_KEY = _derive_insight_key(TEST_MNEMONIC, slot=0) |
| 120 | + |
| 121 | + |
| 122 | +def serialize_metadata( |
| 123 | + chain_id: int, |
| 124 | + contract_address: bytes, |
| 125 | + selector: bytes, |
| 126 | + tx_hash: bytes, |
| 127 | + method_name: str, |
| 128 | + args: list, |
| 129 | + classification: int = CLASSIFICATION_VERIFIED, |
| 130 | + timestamp: int = None, |
| 131 | + key_id: int = 0, |
| 132 | + version: int = 1, |
| 133 | +) -> bytes: |
| 134 | + """Serialize metadata fields into canonical binary (unsigned). |
| 135 | +
|
| 136 | + Args: |
| 137 | + chain_id: EIP-155 chain ID |
| 138 | + contract_address: 20-byte contract address |
| 139 | + selector: 4-byte function selector |
| 140 | + tx_hash: 32-byte keccak-256 of unsigned tx (can be zeroed for phase 1) |
| 141 | + method_name: UTF-8 method name (max 64 bytes) |
| 142 | + args: list of dicts with keys: name, format, value (bytes) |
| 143 | + classification: 0=OPAQUE, 1=VERIFIED, 2=MALFORMED |
| 144 | + timestamp: Unix seconds (defaults to now) |
| 145 | + key_id: embedded public key slot (0-3) |
| 146 | + version: schema version (must be 1) |
| 147 | +
|
| 148 | + Returns: |
| 149 | + Canonical binary payload (without signature — call sign_metadata next) |
| 150 | + """ |
| 151 | + if timestamp is None: |
| 152 | + timestamp = int(time.time()) |
| 153 | + |
| 154 | + assert len(contract_address) == 20 |
| 155 | + assert len(selector) == 4 |
| 156 | + assert len(tx_hash) == 32 |
| 157 | + assert len(method_name.encode('utf-8')) <= 64 |
| 158 | + assert len(args) <= 8 |
| 159 | + |
| 160 | + buf = bytearray() |
| 161 | + |
| 162 | + # version |
| 163 | + buf.append(version) |
| 164 | + |
| 165 | + # chain_id (4 bytes BE) |
| 166 | + buf.extend(struct.pack('>I', chain_id)) |
| 167 | + |
| 168 | + # contract_address (20 bytes) |
| 169 | + buf.extend(contract_address) |
| 170 | + |
| 171 | + # selector (4 bytes) |
| 172 | + buf.extend(selector) |
| 173 | + |
| 174 | + # tx_hash (32 bytes) |
| 175 | + buf.extend(tx_hash) |
| 176 | + |
| 177 | + # method_name (2-byte length prefix + UTF-8) |
| 178 | + name_bytes = method_name.encode('utf-8') |
| 179 | + buf.extend(struct.pack('>H', len(name_bytes))) |
| 180 | + buf.extend(name_bytes) |
| 181 | + |
| 182 | + # num_args |
| 183 | + buf.append(len(args)) |
| 184 | + |
| 185 | + # args |
| 186 | + for arg in args: |
| 187 | + # name (1-byte length prefix + UTF-8) |
| 188 | + arg_name = arg['name'].encode('utf-8') |
| 189 | + assert len(arg_name) <= 32 |
| 190 | + buf.append(len(arg_name)) |
| 191 | + buf.extend(arg_name) |
| 192 | + |
| 193 | + # format |
| 194 | + buf.append(arg['format']) |
| 195 | + |
| 196 | + # value (2-byte length prefix + raw bytes) |
| 197 | + val = arg['value'] |
| 198 | + assert len(val) <= 32 # METADATA_MAX_ARG_VALUE_LEN |
| 199 | + buf.extend(struct.pack('>H', len(val))) |
| 200 | + buf.extend(val) |
| 201 | + |
| 202 | + # classification |
| 203 | + buf.append(classification) |
| 204 | + |
| 205 | + # timestamp (4 bytes BE) |
| 206 | + buf.extend(struct.pack('>I', timestamp)) |
| 207 | + |
| 208 | + # key_id |
| 209 | + buf.append(key_id) |
| 210 | + |
| 211 | + return bytes(buf) |
| 212 | + |
| 213 | + |
| 214 | +def sign_metadata(payload: bytes, private_key: bytes = None) -> bytes: |
| 215 | + """Sign the canonical binary payload and return the complete signed blob. |
| 216 | +
|
| 217 | + Signs SHA-256(payload) with secp256k1 ECDSA, appends signature(64) + recovery(1). |
| 218 | +
|
| 219 | + Args: |
| 220 | + payload: canonical binary from serialize_metadata() |
| 221 | + private_key: 32-byte secp256k1 private key (defaults to test key) |
| 222 | +
|
| 223 | + Returns: |
| 224 | + Complete signed blob: payload + signature(64) + recovery(1) |
| 225 | + """ |
| 226 | + if private_key is None: |
| 227 | + private_key = TEST_PRIVATE_KEY |
| 228 | + |
| 229 | + digest = hashlib.sha256(payload).digest() |
| 230 | + |
| 231 | + try: |
| 232 | + from ecdsa import SigningKey, SECP256k1, util |
| 233 | + sk = SigningKey.from_string(private_key, curve=SECP256k1) |
| 234 | + sig_der = sk.sign_digest(digest, sigencode=util.sigencode_string) |
| 235 | + # sig_der is r(32) || s(32) = 64 bytes |
| 236 | + r = sig_der[:32] |
| 237 | + s = sig_der[32:] |
| 238 | + |
| 239 | + # Recovery: compute v (27 or 28) |
| 240 | + vk = sk.get_verifying_key() |
| 241 | + pubkey = b'\x04' + vk.to_string() |
| 242 | + # Try recovery with v=0 and v=1 |
| 243 | + from ecdsa import VerifyingKey |
| 244 | + for v in (0, 1): |
| 245 | + try: |
| 246 | + recovered = VerifyingKey.from_public_key_recovery_with_digest( |
| 247 | + sig_der, digest, SECP256k1, hashfunc=hashlib.sha256 |
| 248 | + ) |
| 249 | + for i, rk in enumerate(recovered): |
| 250 | + if rk.to_string() == vk.to_string(): |
| 251 | + recovery = 27 + i |
| 252 | + break |
| 253 | + else: |
| 254 | + recovery = 27 |
| 255 | + break |
| 256 | + except Exception: |
| 257 | + continue |
| 258 | + else: |
| 259 | + recovery = 27 |
| 260 | + |
| 261 | + except ImportError: |
| 262 | + # Fallback: zero signature for struct-only testing |
| 263 | + r = b'\x00' * 32 |
| 264 | + s = b'\x00' * 32 |
| 265 | + recovery = 27 |
| 266 | + |
| 267 | + return payload + r + s + bytes([recovery]) |
| 268 | + |
| 269 | + |
| 270 | +def build_test_metadata( |
| 271 | + chain_id=1, |
| 272 | + contract_address=None, |
| 273 | + selector=None, |
| 274 | + tx_hash=None, |
| 275 | + method_name='supply', |
| 276 | + args=None, |
| 277 | + key_id=3, # Slot 3: CI test key (DEBUG_LINK builds only) |
| 278 | + **kwargs, |
| 279 | +) -> bytes: |
| 280 | + """Convenience: build a complete signed test metadata blob. |
| 281 | +
|
| 282 | + Defaults to an Aave V3 supply() call on Ethereum mainnet. |
| 283 | + Uses key_id=1 (CI test slot) by default. |
| 284 | + """ |
| 285 | + if contract_address is None: |
| 286 | + contract_address = bytes.fromhex('7d2768de32b0b80b7a3454c06bdac94a69ddc7a9') |
| 287 | + if selector is None: |
| 288 | + selector = bytes.fromhex('617ba037') |
| 289 | + if tx_hash is None: |
| 290 | + tx_hash = b'\x00' * 32 |
| 291 | + if args is None: |
| 292 | + args = [ |
| 293 | + { |
| 294 | + 'name': 'asset', |
| 295 | + 'format': ARG_FORMAT_ADDRESS, |
| 296 | + 'value': bytes.fromhex('6b175474e89094c44da98b954eedeac495271d0f'), |
| 297 | + }, |
| 298 | + { |
| 299 | + 'name': 'amount', |
| 300 | + 'format': ARG_FORMAT_AMOUNT, |
| 301 | + 'value': (10500000000000000000).to_bytes(32, 'big'), |
| 302 | + }, |
| 303 | + { |
| 304 | + 'name': 'onBehalfOf', |
| 305 | + 'format': ARG_FORMAT_ADDRESS, |
| 306 | + 'value': bytes.fromhex('d8da6bf26964af9d7eed9e03e53415d37aa96045'), |
| 307 | + }, |
| 308 | + ] |
| 309 | + |
| 310 | + payload = serialize_metadata( |
| 311 | + chain_id=chain_id, |
| 312 | + contract_address=contract_address, |
| 313 | + selector=selector, |
| 314 | + tx_hash=tx_hash, |
| 315 | + method_name=method_name, |
| 316 | + args=args, |
| 317 | + key_id=key_id, |
| 318 | + **kwargs, |
| 319 | + ) |
| 320 | + return sign_metadata(payload) |
0 commit comments