diff --git a/application/execution_service.py b/application/execution_service.py index d53ec84..a746859 100644 --- a/application/execution_service.py +++ b/application/execution_service.py @@ -2,6 +2,7 @@ from __future__ import annotations +import math import hashlib import json import tempfile @@ -649,6 +650,37 @@ def execute_rebalance( insufficient_buying_power_symbols: list[str] = [] min_notional_symbols: list[str] = [] quantity_zero_symbols: list[str] = [] + anticipated_buying_power = get_available_buying_power( + ib, + account_values.get("buying_power", 0), + account_ids=normalized_account_ids, + ) + cash_sweep_quantity = 0 + cash_sweep_price = float(prices.get(safe_haven_symbol, 0.0) or 0.0) if safe_haven_symbol else 0.0 + dry_run_sale_proceeds = 0.0 + + def cash_sweep_sale_quantity_to_fund_buy(max_quantity: int, candidate_symbols: tuple[str, ...]) -> int: + if max_quantity <= 0 or not safe_haven_symbol or cash_sweep_price <= 0.0: + return 0 + base_buying_power = max(0.0, float(anticipated_buying_power)) + for symbol in candidate_symbols: + underweight_value = target_mv[symbol] - current_mv.get(symbol, 0.0) + if underweight_value <= threshold: + continue + ask = prices.get(symbol) + if not ask or ask <= 0.0: + continue + max_buy_quantity = int(underweight_value // ask) + if max_buy_quantity <= 0: + continue + required_buying_power = max_buy_quantity * ask * 1.0 + if base_buying_power >= required_buying_power: + return 0 + return min( + max_quantity, + max(1, math.ceil((required_buying_power - base_buying_power) / cash_sweep_price)), + ) + return 0 has_sell_plan = False for symbol in all_symbols: @@ -672,11 +704,27 @@ def execute_rebalance( break quantity_zero_symbols.append(symbol) - anticipated_buying_power = get_available_buying_power( - ib, - account_values.get("buying_power", 0), - account_ids=normalized_account_ids, - ) + funding_buy_candidates = [ + symbol + for symbol in target_mv + if symbol != safe_haven_symbol + and (target_mv[symbol] - current_mv.get(symbol, 0.0)) > threshold + and abs(target_mv[symbol] - current_mv.get(symbol, 0.0)) > minimum_order_notional + ] + if ( + not has_sell_plan + and funding_buy_candidates + and safe_haven_symbol + and cash_sweep_price > 0.0 + and float(positions.get(safe_haven_symbol, {}).get("quantity", 0.0) or 0.0) > 0.0 + ): + cash_sweep_quantity = cash_sweep_sale_quantity_to_fund_buy( + int(float(positions.get(safe_haven_symbol, {}).get("quantity", 0.0) or 0.0)), + tuple(funding_buy_candidates), + ) + if cash_sweep_quantity > 0: + has_sell_plan = True + has_buy_plan = False for symbol, target in target_mv.items(): current = current_mv.get(symbol, 0.0) @@ -813,8 +861,24 @@ def execute_rebalance( for symbol in all_symbols: current = current_mv.get(symbol, 0) target = target_mv.get(symbol, 0) - if current > target + threshold: - price = prices.get(symbol) + price = prices.get(symbol) + if symbol == safe_haven_symbol and cash_sweep_quantity > 0: + if not price: + execution_summary["orders_skipped"].append({"symbol": symbol, "side": "sell", "reason": "missing_price"}) + execution_summary["skipped_reasons"].append(f"missing_price:{symbol}") + continue + regular_qty = _sell_order_quantity( + current_value=current, + target_value=target, + price=price, + position_quantity=positions.get(symbol, {}).get("quantity", 0), + quantity_step=order_quantity_step, + ) + qty = max(int(cash_sweep_quantity), int(regular_qty)) + if qty <= 0: + execution_summary["orders_skipped"].append({"symbol": symbol, "side": "sell", "reason": "quantity_zero"}) + continue + elif current > target + threshold: if not price: execution_summary["orders_skipped"].append({"symbol": symbol, "side": "sell", "reason": "missing_price"}) execution_summary["skipped_reasons"].append(f"missing_price:{symbol}") @@ -829,52 +893,59 @@ def execute_rebalance( if qty <= 0: execution_summary["orders_skipped"].append({"symbol": symbol, "side": "sell", "reason": "quantity_zero"}) continue + else: + continue - if dry_run_only: - execution_summary["orders_submitted"].append( - {"symbol": symbol, "side": "sell", "quantity": qty, "status": "dry_run"} - ) - trade_logs.append(f"DRY_RUN sell {symbol} {format_quantity(qty)}") - continue - report = submit_order_intent( + if dry_run_only: + execution_summary["orders_submitted"].append( + {"symbol": symbol, "side": "sell", "quantity": qty, "status": "dry_run"} + ) + trade_logs.append(f"DRY_RUN sell {symbol} {format_quantity(qty)}") + dry_run_sale_proceeds += float(qty) * float(price) + continue + report = submit_order_intent( + ib, + order_intent_cls( + symbol=symbol, + side="sell", + quantity=qty, + account_id=order_account_id, + ), + ) + ok, status_msg = check_order_submitted(report, translator=translator) + status = str(getattr(report, "status", "") or "") + order_payload = { + "symbol": symbol, + "side": "sell", + "quantity": qty, + "status": status, + "broker_order_id": getattr(report, "broker_order_id", None), + } + if status == "Filled": + execution_summary["orders_filled"].append(order_payload) + elif status in {"PartiallyFilled", "Partial"}: + execution_summary["orders_partially_filled"].append(order_payload) + elif ok: + execution_summary["orders_submitted"].append(order_payload) + else: + execution_summary["orders_skipped"].append({**order_payload, "reason": status or "submit_failed"}) + execution_summary["skipped_reasons"].append(f"submit_failed:{symbol}:{status or 'unknown'}") + trade_logs.append(translator("market_sell", symbol=symbol, qty=format_quantity(qty)) + f" {status_msg}") + if ok: + sell_executed = True + + if dry_run_only: + buying_power = max(0.0, anticipated_buying_power + dry_run_sale_proceeds) + else: + if sell_executed: + time.sleep(sell_settle_delay_sec) + buying_power = get_available_buying_power( ib, - order_intent_cls( - symbol=symbol, - side="sell", - quantity=qty, - account_id=order_account_id, - ), + account_values.get("buying_power", 0), + account_ids=normalized_account_ids, ) - ok, status_msg = check_order_submitted(report, translator=translator) - status = str(getattr(report, "status", "") or "") - order_payload = { - "symbol": symbol, - "side": "sell", - "quantity": qty, - "status": status, - "broker_order_id": getattr(report, "broker_order_id", None), - } - if status == "Filled": - execution_summary["orders_filled"].append(order_payload) - elif status in {"PartiallyFilled", "Partial"}: - execution_summary["orders_partially_filled"].append(order_payload) - elif ok: - execution_summary["orders_submitted"].append(order_payload) - else: - execution_summary["orders_skipped"].append({**order_payload, "reason": status or "submit_failed"}) - execution_summary["skipped_reasons"].append(f"submit_failed:{symbol}:{status or 'unknown'}") - trade_logs.append(translator("market_sell", symbol=symbol, qty=format_quantity(qty)) + f" {status_msg}") - if ok: - sell_executed = True - - if sell_executed: - time.sleep(sell_settle_delay_sec) - - buying_power = anticipated_buying_power if not sell_executed else get_available_buying_power( - ib, - account_values.get("buying_power", 0), - account_ids=normalized_account_ids, - ) + else: + buying_power = anticipated_buying_power for symbol, target in target_mv.items(): current = current_mv.get(symbol, 0) diff --git a/main.py b/main.py index 346773b..ac4ed31 100644 --- a/main.py +++ b/main.py @@ -641,6 +641,25 @@ def handle_request(): result=cycle_result.result, ) return cycle_result.result, 200 + except TimeoutError as exc: + append_runtime_report_error( + report, + stage="ibkr_connect", + message=str(exc), + error_type=type(exc).__name__, + ) + finalize_runtime_report(report, status="error") + log_runtime_event( + log_context, + "ibkr_gateway_connect_timeout", + message="IBKR gateway handshake timed out", + severity="ERROR", + error_type=type(exc).__name__, + error_message=str(exc), + ) + error_msg = f"🚨 【IBKR 连接异常】\n{str(exc)}" + publish_notification(detailed_text=error_msg, compact_text=error_msg) + return "Error", 500 except Exception as exc: append_runtime_report_error( report, diff --git a/tests/test_connect_timeout_alert.py b/tests/test_connect_timeout_alert.py new file mode 100644 index 0000000..0a85e01 --- /dev/null +++ b/tests/test_connect_timeout_alert.py @@ -0,0 +1,190 @@ +import importlib +import sys +import types +import unittest +from datetime import timezone as datetime_timezone +from contextlib import contextmanager +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) +QPK_SRC = ROOT.parent / "QuantPlatformKit" / "src" +if str(QPK_SRC) not in sys.path: + sys.path.insert(0, str(QPK_SRC)) +UES_SRC = ROOT.parent / "UsEquityStrategies" / "src" +if str(UES_SRC) not in sys.path: + sys.path.insert(0, str(UES_SRC)) + + +@contextmanager +def install_stub_modules(): + flask_module = types.ModuleType("flask") + + class Flask: + def __init__(self, _name): + self._routes = {} + + def route(self, path, methods=None): + def decorator(func): + self._routes[(path, tuple(methods or []))] = func + return func + + return decorator + + def test_request_context(self, *_args, **_kwargs): + class _Context: + def __enter__(self_inner): + return self_inner + + def __exit__(self_inner, exc_type, exc, tb): + return False + + return _Context() + + def run(self, *args, **kwargs): + return None + + flask_module.Flask = Flask + flask_module.request = types.SimpleNamespace(method="POST", headers={}) + + requests_module = types.ModuleType("requests") + requests_module.post = lambda *args, **kwargs: None + + google_module = types.ModuleType("google") + google_module.__path__ = [] + google_auth_module = types.ModuleType("google.auth") + google_auth_module.default = lambda *args, **kwargs: (None, None) + google_module.auth = google_auth_module + + google_cloud_module = types.ModuleType("google.cloud") + google_cloud_module.__path__ = [] + compute_v1_module = types.ModuleType("google.cloud.compute_v1") + google_cloud_module.compute_v1 = compute_v1_module + + pandas_module = types.ModuleType("pandas") + pandas_module.Timestamp = lambda value=None: value + + market_calendars_module = types.ModuleType("pandas_market_calendars") + market_calendars_module.get_calendar = lambda name: None + + pytz_module = types.ModuleType("pytz") + pytz_module.utc = datetime_timezone.utc + pytz_module.timezone = lambda _name: datetime_timezone.utc + + ib_insync_module = types.ModuleType("ib_insync") + ib_insync_module.IB = type("IB", (), {}) + ib_insync_module.Stock = type("Stock", (), {}) + ib_insync_module.MarketOrder = type("MarketOrder", (), {}) + ib_insync_module.LimitOrder = type("LimitOrder", (), {}) + + strategy_runtime_module = types.ModuleType("strategy_runtime") + strategy_runtime_module.load_strategy_runtime = lambda *_args, **_kwargs: types.SimpleNamespace( + entrypoint=lambda **_kwargs: None, + required_inputs=frozenset({"portfolio_snapshot"}), + status_icon="📈", + merged_runtime_config={"trend_ma_window": 150}, + runtime_config={"trend_ma_window": 150}, + managed_symbols=("TQQQ", "BOXX"), + runtime_adapter=types.SimpleNamespace( + runtime_policy=types.SimpleNamespace(signal_effective_after_trading_days=None), + available_inputs=frozenset({"derived_indicators", "portfolio_snapshot"}) + ), + cash_reserve_ratio=0.03, + evaluate=lambda **_kwargs: None, + ) + + runtime_config_support_module = types.ModuleType("runtime_config_support") + runtime_config_support_module.load_platform_runtime_settings = lambda **_kwargs: types.SimpleNamespace( + project_id=None, + secret_name="secret", + strategy_profile="tqqq_growth_income", + strategy_display_name="TQQQ Growth Income", + strategy_domain="us_equity", + account_group="default", + account_ids=("U18308207",), + service_name="interactive-brokers-platform", + ib_gateway_instance_name="127.0.0.1", + ib_gateway_zone=None, + ib_gateway_mode="paper", + ib_gateway_ip_mode="internal", + ib_client_id=1, + ib_connect_timeout_seconds=60, + connect_attempts=3, + connect_retry_delay_seconds=0, + client_id_retry_offset=100, + strategy_target_mode="paper", + strategy_artifact_dir="/tmp", + feature_snapshot_path=None, + feature_snapshot_manifest_path=None, + strategy_config_path=None, + strategy_config_source=None, + reconciliation_output_path=None, + dry_run_only=False, + quantity_step=1, + min_order_notional=0.0, + runtime_target={}, + notify_lang="en", + tg_token=None, + tg_chat_id="chat-id", + ibkr_feature_snapshot_manifest_path=None, + ibkr_reconciliation_output_path=None, + market_hours_source="cloud_run", + ) + runtime_config_support_module.resolve_ib_gateway_ip_mode = lambda *_args, **_kwargs: "internal" + + modules = { + "flask": flask_module, + "requests": requests_module, + "google": google_module, + "google.auth": google_auth_module, + "google.cloud": google_cloud_module, + "google.cloud.compute_v1": compute_v1_module, + "pandas": pandas_module, + "pandas_market_calendars": market_calendars_module, + "pytz": pytz_module, + "ib_insync": ib_insync_module, + "strategy_runtime": strategy_runtime_module, + "runtime_config_support": runtime_config_support_module, + } + original = {name: sys.modules.get(name) for name in modules} + sys.modules.update(modules) + try: + yield + finally: + for name, previous in original.items(): + if previous is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = previous + + +class IBRKConnectTimeoutAlertTests(unittest.TestCase): + def test_handle_request_sends_ibkr_connect_timeout_notification(self): + with install_stub_modules(): + sys.modules.pop("main", None) + module = importlib.import_module("main") + observed = {"messages": []} + + module.is_market_open_today = lambda: True + module.run_strategy_core = lambda **_kwargs: (_ for _ in ()).throw( + TimeoutError("IBKR API handshake timed out") + ) + module.persist_execution_report = lambda report: observed.setdefault("report", dict(report)) or "/tmp/report.json" + module.publish_notification = lambda *, detailed_text, compact_text: observed["messages"].append( + (detailed_text, compact_text) + ) + module.build_run_id = lambda: "run-001" + + with module.app.test_request_context("/", method="POST"): + body, status = module.handle_request() + + self.assertEqual(status, 500) + self.assertEqual(body, "Error") + self.assertEqual(observed["report"]["errors"][0]["stage"], "ibkr_connect") + self.assertIn("IBKR 连接异常", observed["messages"][0][0]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_execution_service.py b/tests/test_execution_service.py index 6bc4537..8043820 100644 --- a/tests/test_execution_service.py +++ b/tests/test_execution_service.py @@ -394,7 +394,7 @@ def fake_fetch_quote_snapshots(_ib, symbols): {"VOO": 0.8, "BOXX": 0.2}, {}, {"equity": 1000.0, "buying_power": 1000.0}, - dry_run_only=True, + dry_run_only=False, **kwargs, ) second_logs = execute_rebalance( @@ -402,7 +402,7 @@ def fake_fetch_quote_snapshots(_ib, symbols): {"VOO": 0.8, "BOXX": 0.2}, {}, {"equity": 1000.0, "buying_power": 1000.0}, - dry_run_only=True, + dry_run_only=False, **kwargs, ) paper_logs = execute_rebalance( @@ -416,7 +416,7 @@ def fake_fetch_quote_snapshots(_ib, symbols): assert any("execution_lock_acquired" in log for log in first_logs) assert any("same_day_execution_locked" in log for log in second_logs) - assert any("execution_lock_acquired" in log for log in paper_logs) + assert any("same_day_execution_locked" in log for log in paper_logs) def test_execute_rebalance_skips_when_same_day_fills_detected(): @@ -503,7 +503,7 @@ def fake_fetch_quote_snapshots(_ib, symbols): trade_date="2026-04-01", snapshot_as_of="2026-03-31", ), - dry_run_only=True, + dry_run_only=False, cash_reserve_ratio=0.0, rebalance_threshold_ratio=0.02, limit_buy_premium=1.005, @@ -514,12 +514,81 @@ def fake_fetch_quote_snapshots(_ib, symbols): assert any("execution_lock_acquired" in log for log in trade_logs) assert summary["execution_status"] == "executed" - assert summary["mode"] == "dry_run" + assert summary["mode"] == "paper" assert summary["safe_haven_symbol"] == "BOXX" assert summary["orders_submitted"] assert summary["target_vs_current"] +def test_execute_rebalance_sells_cash_sweep_symbol_when_buying_power_is_short(monkeypatch, tmp_path): + class FakeIB: + def __init__(self): + self._account_values_calls = 0 + + def openTrades(self): + return [] + + def fills(self): + return [] + + def accountValues(self): + self._account_values_calls += 1 + buying_power = "124" if self._account_values_calls == 1 else "324" + return [SimpleNamespace(tag="AvailableFunds", currency="USD", value=buying_power)] + + submitted = [] + + def fake_submit_order_intent(_ib, intent): + submitted.append(intent) + return SimpleNamespace(broker_order_id=f"order-{len(submitted)}", status="Submitted") + + monkeypatch.setattr("application.execution_service.time.sleep", lambda _seconds: None) + + trade_logs, summary = execute_rebalance( + FakeIB(), + {"VOO": 0.8, "BOXX": 0.2}, + {"VOO": {"quantity": 0}, "BOXX": {"quantity": 2}}, + {"equity": 1000.0, "buying_power": 124.0}, + fetch_quote_snapshots=lambda *_args, **_kwargs: { + "VOO": SimpleNamespace(last_price=100.0), + "BOXX": SimpleNamespace(last_price=100.0), + }, + submit_order_intent=fake_submit_order_intent, + order_intent_cls=OrderIntent, + translator=translate, + strategy_symbols=["VOO", "BOXX"], + strategy_profile="tech_communication_pullback_enhancement", + account_group="default", + service_name="ibkr-paper", + account_ids=("DU123",), + signal_metadata=_signal_metadata( + {"VOO": 0.8, "BOXX": 0.2}, + risk_symbols=("VOO",), + safe_haven_symbols=("BOXX",), + regime="risk_on", + breadth_ratio=0.6, + target_stock_weight=0.8, + realized_stock_weight=0.8, + safe_haven_weight=0.2, + safe_haven_symbol="BOXX", + trade_date="2026-04-01", + snapshot_as_of="2026-03-31", + ), + dry_run_only=False, + cash_reserve_ratio=0.0, + rebalance_threshold_ratio=0.02, + limit_buy_premium=1.005, + sell_settle_delay_sec=0, + execution_lock_dir=tmp_path, + return_summary=True, + ) + + assert any(intent.side == "sell" and intent.symbol == "BOXX" for intent in submitted) + assert any(intent.side == "buy" and intent.symbol == "VOO" for intent in submitted) + assert summary["execution_status"] == "executed" + assert any(log.startswith("sell BOXX") for log in trade_logs) + + def test_execute_rebalance_blocks_when_material_target_has_missing_prices(): class FakeIB: def openTrades(self):