Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ Recommended account-group config payload:
}
```

If you later add live, keep it as a separate live group such as `live-main` with `ib_gateway_mode=live` and its own `account_ids`.
For live multi-account rollout, keep one Cloud Run service per live account group. Each live group should carry exactly one `account_ids` value so portfolio reads, pending/fill guards, and submitted IBKR orders are all routed to that account.

See [`docs/examples/ibkr-account-groups.paper.json`](docs/examples/ibkr-account-groups.paper.json) for a ready-to-edit starter example, and [`docs/ibkr_runtime_rollout.md`](docs/ibkr_runtime_rollout.md) for the exact rollout steps to get `ACCOUNT_GROUP=paper` running.

Expand Down Expand Up @@ -403,6 +403,8 @@ IB_GATEWAY_IP_MODE=internal

仓库里也提供了一个可以直接改的起始样例:[`docs/examples/ibkr-account-groups.paper.json`](docs/examples/ibkr-account-groups.paper.json)。如果你要按 `ACCOUNT_GROUP=paper` 先落地,直接看 [`docs/ibkr_runtime_rollout.md`](docs/ibkr_runtime_rollout.md)。

实盘多账户建议一个 UID 对应一个 Cloud Run 服务和一个账号组。每个实盘账号组只放一个 `account_ids` 值;运行时会用它过滤持仓、pending/fill 检查,并把同一个 UID 写进 IBKR 订单的 `order.account`。

当前行为改成了 fail-fast:

- 没有 `STRATEGY_PROFILE` → 启动直接报错
Expand Down
105 changes: 98 additions & 7 deletions application/execution_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,44 @@ def check_order_submitted(report, *, translator):
return False, f"❌ {translator('failed', reason=status)}"


def get_available_buying_power(ib, fallback_buying_power):
def _normalize_account_ids(account_ids=None) -> tuple[str, ...]:
if account_ids is None:
return ()
if isinstance(account_ids, str):
candidates = [account_ids]
else:
candidates = list(account_ids)
normalized = []
for candidate in candidates:
text = str(candidate or "").strip()
if text:
normalized.append(text)
return tuple(dict.fromkeys(normalized))


def _matches_account(account_id: str | None, selected_account_ids: tuple[str, ...]) -> bool:
if not selected_account_ids:
return True
return str(account_id or "").strip() in selected_account_ids


def _resolve_order_account_id(account_ids=None) -> str | None:
normalized = _normalize_account_ids(account_ids)
if len(normalized) > 1:
raise ValueError(
"IBKR live order routing requires a single account_id per runtime service; "
f"got {len(normalized)} account_ids."
)
return normalized[0] if normalized else None


def get_available_buying_power(ib, fallback_buying_power, *, account_ids=None):
selected_account_ids = _normalize_account_ids(account_ids)
buying_power = fallback_buying_power
for account_value in ib.accountValues():
account_id = str(getattr(account_value, "account", "") or "").strip() or None
if not _matches_account(account_id, selected_account_ids):
continue
if account_value.tag == "AvailableFunds" and account_value.currency == "USD":
buying_power = float(account_value.value)
return buying_power
Expand Down Expand Up @@ -101,9 +136,24 @@ def _extract_open_order_status(order_like: Any) -> str:
return str(status or "").strip()


def _collect_pending_symbols(ib, symbols: set[str]) -> tuple[str, ...]:
def _extract_open_order_account(order_like: Any) -> str | None:
order = getattr(order_like, "order", None)
for candidate in (
getattr(order, "account", None),
getattr(order_like, "account", None),
):
text = str(candidate or "").strip()
if text:
return text
return None


def _collect_pending_symbols(ib, symbols: set[str], *, account_ids=None) -> tuple[str, ...]:
selected_account_ids = _normalize_account_ids(account_ids)
pending = []
for order_like in _iter_open_orders(ib):
if not _matches_account(_extract_open_order_account(order_like), selected_account_ids):
continue
status = _extract_open_order_status(order_like)
if status in {"Cancelled", "ApiCancelled", "Inactive", "Filled"}:
continue
Expand All @@ -127,6 +177,19 @@ def _extract_fill_symbol(fill_like: Any) -> str | None:
return symbol_text or None


def _extract_fill_account(fill_like: Any) -> str | None:
execution = getattr(fill_like, "execution", None)
for candidate in (
getattr(execution, "acctNumber", None),
getattr(execution, "account", None),
getattr(fill_like, "account", None),
):
text = str(candidate or "").strip()
if text:
return text
return None


def _normalize_date_like(value: Any) -> str | None:
if value in {None, ""}:
return None
Expand All @@ -150,11 +213,20 @@ def _extract_fill_date(fill_like: Any) -> str | None:
return None


def _collect_same_day_filled_symbols(ib, symbols: set[str], trade_date: str | None) -> tuple[str, ...]:
def _collect_same_day_filled_symbols(
ib,
symbols: set[str],
trade_date: str | None,
*,
account_ids=None,
) -> tuple[str, ...]:
if not trade_date:
return ()
selected_account_ids = _normalize_account_ids(account_ids)
matched = []
for fill_like in _iter_fills(ib):
if not _matches_account(_extract_fill_account(fill_like), selected_account_ids):
continue
symbol = _extract_fill_symbol(fill_like)
if not symbol or symbol not in symbols:
continue
Expand Down Expand Up @@ -425,9 +497,15 @@ def execute_rebalance(
safe_haven_symbols = tuple(allocation["safe_haven_symbols"])
safe_haven_symbol = safe_haven_symbols[0] if safe_haven_symbols else None
equity = account_values.get("equity", 0)
normalized_account_ids = _normalize_account_ids(account_ids)
order_account_id = _resolve_order_account_id(normalized_account_ids)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject multi-account IDs only when submitting real orders

execute_rebalance now calls _resolve_order_account_id at function start, so any account_ids with more than one value raises ValueError even in dry_run_only=True runs or other early-exit paths where no order would be sent. That regresses dry-run/validation workflows for multi-account groups and does not match the stated behavior of failing when the service actually attempts real order routing; the check should be deferred to the non-dry-run submission path (or gated by a live-order mode flag).

Useful? React with 👍 / 👎.

execution_summary = {
"mode": "dry_run" if dry_run_only else "paper",
"strategy_profile": strategy_profile,
"account_group": account_group,
"account_ids": list(normalized_account_ids),
"order_account_id": order_account_id,
"service_name": service_name,
"trade_date": trade_date,
"snapshot_as_of": snapshot_date,
"safe_haven_symbol": safe_haven_symbol,
Expand Down Expand Up @@ -508,7 +586,7 @@ def execute_rebalance(
(sum(current_mv.values()) - current_safe_haven_mv) / equity
)

pending_symbols = _collect_pending_symbols(ib, set(all_symbols))
pending_symbols = _collect_pending_symbols(ib, set(all_symbols), account_ids=normalized_account_ids)
if pending_symbols:
reason = f"pending_orders_detected:{','.join(pending_symbols)}"
execution_summary["execution_status"] = "blocked"
Expand Down Expand Up @@ -597,6 +675,7 @@ def execute_rebalance(
anticipated_buying_power = get_available_buying_power(
ib,
account_values.get("buying_power", 0),
account_ids=normalized_account_ids,
)
has_buy_plan = False
for symbol, target in target_mv.items():
Expand Down Expand Up @@ -655,7 +734,12 @@ def execute_rebalance(
trade_logs.append(translator("failed", reason=reason))
return _finalize_result(trade_logs, execution_summary, return_summary=return_summary)

same_day_filled_symbols = _collect_same_day_filled_symbols(ib, set(all_symbols), trade_date)
same_day_filled_symbols = _collect_same_day_filled_symbols(
ib,
set(all_symbols),
trade_date,
account_ids=normalized_account_ids,
)
if same_day_filled_symbols:
reason = f"same_day_fills_detected:{','.join(same_day_filled_symbols)}"
execution_summary["execution_status"] = "blocked"
Expand Down Expand Up @@ -685,7 +769,7 @@ def execute_rebalance(
strategy_profile=strategy_profile,
account_group=account_group,
service_name=service_name,
account_ids=tuple(account_ids or ()),
account_ids=normalized_account_ids,
trade_date=trade_date,
snapshot_date=snapshot_date,
target_hash=target_hash,
Expand Down Expand Up @@ -754,7 +838,12 @@ def execute_rebalance(
continue
report = submit_order_intent(
ib,
order_intent_cls(symbol=symbol, side="sell", quantity=qty),
order_intent_cls(
symbol=symbol,
side="sell",
quantity=qty,
account_id=order_account_id,
),
)
ok, status_msg = check_order_submitted(report, translator=translator)
status = str(getattr(report, "status", "") or "")
Expand Down Expand Up @@ -784,6 +873,7 @@ def execute_rebalance(
buying_power = anticipated_buying_power if not sell_executed else get_available_buying_power(
ib,
account_values.get("buying_power", 0),
account_ids=normalized_account_ids,
)

for symbol, target in target_mv.items():
Expand Down Expand Up @@ -830,6 +920,7 @@ def execute_rebalance(
order_type="limit",
limit_price=limit_price,
time_in_force="DAY",
account_id=order_account_id,
),
)
ok, status_msg = check_order_submitted(report, translator=translator)
Expand Down
2 changes: 2 additions & 0 deletions application/ibkr_order_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def submit_order_intent(
ib: Any,
order_intent: OrderIntent,
*,
account_id: str | None = None,
wait_seconds: float = 1.0,
stock_factory: Callable[..., Any] | None = None,
market_order_factory: Callable[..., Any] | None = None,
Expand All @@ -50,6 +51,7 @@ def submit_order_intent(
return _submit_order_intent(
ib,
intent,
account_id=account_id,
wait_seconds=wait_seconds,
stock_factory=stock_factory,
market_order_factory=_market_order_factory_with_time_in_force(
Expand Down
9 changes: 7 additions & 2 deletions application/runtime_broker_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class IBKRRuntimeBrokerAdapters:
sleep_fn: Any
printer: Any = print

def fetch_account_portfolio_snapshot(self, ib):
if self.account_ids:
return self.fetch_portfolio_snapshot_fn(ib, account_ids=self.account_ids)
return self.fetch_portfolio_snapshot_fn(ib)

def connect_ib(self):
self.ensure_event_loop_fn()
host = self.host_resolver()
Expand Down Expand Up @@ -82,7 +87,7 @@ def connect_ib(self):
raise last_error

def get_current_portfolio(self, ib):
snapshot = self.fetch_portfolio_snapshot_fn(ib)
snapshot = self.fetch_account_portfolio_snapshot(ib)
positions = {}
for position in snapshot.positions:
positions[position.symbol] = {
Expand All @@ -97,7 +102,7 @@ def get_current_portfolio(self, ib):

def build_portfolio_snapshot(self, ib, *, get_current_portfolio_fallback=None):
if hasattr(ib, "reqPositions"):
return self.fetch_portfolio_snapshot_fn(ib)
return self.fetch_account_portfolio_snapshot(ib)
positions, account_values = get_current_portfolio_fallback(ib)
return PortfolioSnapshot(
as_of=datetime.now(timezone.utc),
Expand Down
2 changes: 1 addition & 1 deletion docs/ibkr_runtime_rollout.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ cp docs/examples/ibkr-account-groups.paper.json /tmp/ibkr-account-groups.json
- `ib_gateway_ip_mode`:推荐 `internal`。
- `ib_client_id`:这个账号组对应的 client id。
- `service_name`:当前只是预留元数据,建议先填成现有 Cloud Run 服务名,后面多账号拆服务时更顺。
- `account_ids`:当前主要是留档和后续扩展,不是启动必填
- `account_ids`:实盘账号组建议只放一个 UID。运行时会用它过滤持仓、pending/fill 检查,并写入 IBKR 订单的 `order.account`;如果一个服务配置多个 UID,实盘下单会因为账户路由不明确而失败

把 secret 建起来:

Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
flask
gunicorn
quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@8769362096227320bc05c791b5244d4b3e88db50
us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@ed55a6af0245323dbed82060e89be96d8f77f756
quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@c6b22288e1447e56cd51a93ea9138e4f1110f165
us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@dd8aca2bea31196c07fd07efba286266478d29e9
pandas
numpy
requests
Expand Down
7 changes: 6 additions & 1 deletion strategy_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,14 @@ def _runtime_adapter_with_portfolio(
def _fetch_portfolio_snapshot_for_context(self, ib, *, required: bool) -> Any | None:
if ib is None and not required:
return None
account_ids = tuple(self.runtime_settings.account_ids or ())
if required:
if account_ids:
return fetch_portfolio_snapshot(ib, account_ids=account_ids)
return fetch_portfolio_snapshot(ib)
try:
if account_ids:
return fetch_portfolio_snapshot(ib, account_ids=account_ids)
return fetch_portfolio_snapshot(ib)
except Exception as exc:
self.logger(
Expand Down Expand Up @@ -245,7 +250,7 @@ def _evaluate_value_target_strategy(
runtime_config = dict(self.runtime_config)
runtime_config.setdefault("translator", translator)
apply_runtime_policy_to_runtime_config(runtime_config, self.runtime_adapter)
portfolio_snapshot = fetch_portfolio_snapshot(ib)
portfolio_snapshot = self._fetch_portfolio_snapshot_for_context(ib, required=True)
market_inputs = self._build_value_target_market_inputs(
ib=ib,
historical_close_loader=historical_close_loader,
Expand Down
Loading