Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/apollo_http_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ serde.workspace = true
serde_json.workspace = true
starknet_api.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt"] }
tokio = { workspace = true, features = ["rt", "sync"] }
tower-http = { workspace = true, features = ["decompression-full", "limit"] }
tracing.workspace = true

Expand Down
16 changes: 16 additions & 0 deletions crates/apollo_http_server/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub enum HttpServerRunError {
/// Errors that may occur during the runtime of the HTTP server.
#[derive(Error, Debug)]
pub enum HttpServerError {
#[error("Server is at capacity; too many in-flight transactions.")]
AtCapacityError(),
#[error(transparent)]
ConfigManagerClientError(#[from] ConfigManagerClientError),
#[error(transparent)]
Expand All @@ -40,6 +42,7 @@ pub enum HttpServerError {
impl IntoResponse for HttpServerError {
fn into_response(self) -> Response {
match self {
HttpServerError::AtCapacityError() => at_capacity_error_into_response(),
HttpServerError::ConfigManagerClientError(e) => {
config_manager_client_err_into_response(e)
}
Expand Down Expand Up @@ -79,6 +82,19 @@ fn serde_error_into_response(err: serde_json::Error) -> Response {
(response_code, response_body).into_response()
}

fn at_capacity_error_into_response() -> Response {
debug!("Rejecting transaction: server is at capacity.");
let (response_code, starknet_error) = (
StatusCode::SERVICE_UNAVAILABLE,
StarknetError {
code: StarknetErrorCode::UnknownErrorCode("Server at capacity.".to_string()),
message: "Server at capacity, please retry later.".to_string(),
},
);
let response_body = serialize_error(&starknet_error);
(response_code, response_body).into_response()
}

fn disabled_error_into_response() -> Response {
debug!("Server is configured to reject transactions.");
let (response_code, starknet_error) = (
Expand Down
25 changes: 23 additions & 2 deletions crates/apollo_http_server/src/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::clone::Clone;
use std::net::SocketAddr;
use std::string::String;
use std::sync::Arc;
use std::time::Duration;

use apollo_config_manager_types::communication::SharedConfigManagerClient;
Expand Down Expand Up @@ -36,6 +37,7 @@ use starknet_api::serde_utils::{
use starknet_api::transaction::fields::ValidResourceBounds;
use tokio::net::TcpListener;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::Semaphore;
use tokio::time;
use tower_http::decompression::RequestDecompressionLayer;
use tower_http::limit::RequestBodyLimitLayer;
Expand All @@ -62,6 +64,11 @@ pub type HttpServerResult<T> = Result<T, HttpServerError>;

const CLIENT_REGION_HEADER: &str = "X-Client-Region";

// Bounds the number of in-flight add_tx requests, each of which holds its payload and detached
// gateway task for the request's full lifetime. Aligned with the gateway component server's own
// concurrency bound. Excess requests are rejected with 503 rather than queued.
const DEFAULT_MAX_CONCURRENT_ADD_TX_REQUESTS: usize = 128;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use configured gateway concurrency for capacity

This hard-codes the HTTP add_tx limit to the default gateway concurrency instead of the configured gateway server/client limit. In deployments where components.gateway.max_concurrency or components.gateway.local_server_config.max_concurrency is tuned away from 128, the HTTP server either sheds valid load too early or still sends more concurrent gateway requests than the gateway can process, defeating the stated alignment with the gateway component's own bound.

Useful? React with 👍 / 👎.


pub struct HttpServer {
config: HttpServerConfig,
app_state: AppState,
Expand All @@ -73,6 +80,10 @@ pub struct HttpServer {
pub struct AppState {
gateway_client: SharedGatewayClient,
dynamic_config_rx: Receiver<HttpServerDynamicConfig>,
// Bounds the number of in-flight add_tx requests. A permit is acquired before the detached
// gateway task is spawned and held for that task's full lifetime, so the count reflects real
// in-flight work even when a client disconnects and the request future is dropped.
add_tx_semaphore: Arc<Semaphore>,
}

impl AppState {
Expand All @@ -94,7 +105,8 @@ impl HttpServer {
) -> Self {
let (dynamic_config_tx, dynamic_config_rx) =
channel::<HttpServerDynamicConfig>(config.dynamic_config.clone());
let app_state = AppState { gateway_client, dynamic_config_rx };
let add_tx_semaphore = Arc::new(Semaphore::new(DEFAULT_MAX_CONCURRENT_ADD_TX_REQUESTS));
let app_state = AppState { gateway_client, dynamic_config_rx, add_tx_semaphore };
HttpServer { config, app_state, config_manager_client, dynamic_config_tx }
}

Expand Down Expand Up @@ -299,16 +311,25 @@ async fn add_tx_inner(
tx: RpcTransaction,
) -> HttpServerResult<Json<GatewayOutput>> {
let gateway_input: GatewayInput = GatewayInput { rpc_tx: tx, message_metadata: None };
// Bound concurrent in-flight requests: acquire the permit before spawning so excess requests
// are rejected with 503 rather than piling up as detached tasks under flood.
let permit = app_state
.add_tx_semaphore
.clone()
.try_acquire_owned()
.map_err(|_| HttpServerError::AtCapacityError())?;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Count capacity rejections as failed transactions

When the semaphore is exhausted, this returns AtCapacityError after the handlers have already incremented ADDED_TRANSACTIONS_TOTAL, but it bypasses record_added_transactions / increment_failure_metrics. During flood shedding, every 503 capacity rejection is therefore counted as received but not as failed, which makes the HTTP failure-rate alert and success/failure dashboards underreport exactly the overload condition this path is meant to expose.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be recorded to the failure metric

Comment on lines +316 to +320

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Move the capacity check before parsing request bodies

Because the permit is acquired only inside add_tx_inner, requests reaching this path have already paid the expensive body work: the RPC endpoint's Json(tx) extractor has deserialized the transaction, and the deprecated endpoint has read the full String, parsed it, and converted it before calling here. Under overload with 128 gateway calls in flight, additional max-size or compressed requests are still read and parsed before being rejected with 503, so this does not actually bound in-flight HTTP add_tx payload work during the flood scenario this change is intended to shed.

Useful? React with 👍 / 👎.

// Wrap the gateway client interaction with a tokio::spawn as it is NOT cancel-safe.
// Even if the current task is cancelled, e.g., when a request is dropped while still being
// processed, the inner task will continue to run.
// processed, the inner task will continue to run. The permit is moved into the spawned task so
// it is held for the task's full lifetime, decoupled from the (cancellable) request future.
let region = headers
.get(CLIENT_REGION_HEADER)
.and_then(|region| region.to_str().ok())
.unwrap_or("N/A")
.to_string();
let add_tx_result = tokio::spawn(
async move {
let _permit = permit;
let add_tx_result = app_state.gateway_client.add_tx(gateway_input).await.map_err(|e| {
debug!("Error while adding transaction: {}", e);
HttpServerError::from(Box::new(e))
Expand Down
139 changes: 134 additions & 5 deletions crates/apollo_http_server/src/http_server_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use std::io::Write;
use std::sync::Arc;
use std::time::Duration;

use apollo_gateway_types::communication::{GatewayClientError, MockGatewayClient};
use apollo_gateway_types::communication::{
GatewayClient,
GatewayClientError,
GatewayClientResult,
MockGatewayClient,
};
use apollo_gateway_types::deprecated_gateway_error::{
KnownStarknetErrorCode,
StarknetError,
Expand All @@ -11,13 +17,17 @@ use apollo_gateway_types::errors::GatewayError;
use apollo_gateway_types::gateway_types::{
DeclareGatewayOutput,
DeployAccountGatewayOutput,
GatewayInput,
GatewayOutput,
InvokeGatewayOutput,
};
use apollo_http_server_config::config::HttpServerDynamicConfig;
use apollo_infra::component_client::ClientError;
use apollo_proc_macros::unique_u16;
use assert_matches::assert_matches;
use async_trait::async_trait;
use axum::body::Bytes;
use axum::http::HeaderMap;
use axum::response::{IntoResponse, Response};
use axum::{Extension, Json};
use http::StatusCode;
Expand All @@ -28,11 +38,11 @@ use starknet_api::test_utils::read_json_file;
use starknet_api::transaction::TransactionHash;
use starknet_api::{class_hash, contract_address, tx_hash};
use starknet_types_core::felt::Felt;
use tokio::sync::watch;
use tokio::sync::{oneshot, watch, Notify, Semaphore};
use tracing_test::traced_test;

use crate::errors::HttpServerError;
use crate::http_server::{is_ready, AppState, CLIENT_REGION_HEADER};
use crate::http_server::{add_rpc_tx, add_tx_inner, is_ready, AppState, CLIENT_REGION_HEADER};
use crate::test_utils::{
deprecated_gateway_declare_tx,
deprecated_gateway_deploy_account_tx,
Expand Down Expand Up @@ -118,8 +128,11 @@ async fn allow_new_txs() {
async fn is_ready_reflects_accept_new_txs() {
let (tx, dynamic_config_rx) =
watch::channel(HttpServerDynamicConfig { accept_new_txs: true, ..Default::default() });
let app_state =
AppState { gateway_client: Arc::new(MockGatewayClient::new()), dynamic_config_rx };
let app_state = AppState {
gateway_client: Arc::new(MockGatewayClient::new()),
dynamic_config_rx,
add_tx_semaphore: Arc::new(Semaphore::new(1)),
};

// Clone AppState to mirror how axum's Extension extractor hands a clone to each request:
// updates to the watch channel must still be observed through the cloned receiver.
Expand Down Expand Up @@ -453,6 +466,122 @@ async fn zstd_compressed_request_too_large() {
assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
}

/// A gateway client whose `add_tx` blocks until `release` is signalled, used to hold a permit
/// in-flight deterministically. It signals `started` once entered and `completed` once it returns.
struct BlockingGatewayClient {
started: Arc<Notify>,
release: Arc<Notify>,
completed: tokio::sync::Mutex<Option<oneshot::Sender<()>>>,
}

#[async_trait]
impl GatewayClient for BlockingGatewayClient {
async fn add_tx(&self, _gateway_input: GatewayInput) -> GatewayClientResult<GatewayOutput> {
self.started.notify_one();
self.release.notified().await;
if let Some(completed) = self.completed.lock().await.take() {
let _ = completed.send(());
}
Ok(default_gateway_output())
}
}

fn app_state_with_semaphore(
gateway_client: Arc<dyn GatewayClient>,
add_tx_semaphore: Arc<Semaphore>,
) -> (watch::Sender<HttpServerDynamicConfig>, AppState) {
let (tx, dynamic_config_rx) =
watch::channel(HttpServerDynamicConfig { accept_new_txs: true, ..Default::default() });
(tx, AppState { gateway_client, dynamic_config_rx, add_tx_semaphore })
}

/// When all permits are taken, a new add_tx request is rejected with `AtCapacityError` rather than
/// spawning more in-flight work.
#[tokio::test]
async fn add_tx_inner_rejects_when_at_capacity() {
let semaphore = Arc::new(Semaphore::new(1));
let (_tx, app_state) =
app_state_with_semaphore(Arc::new(MockGatewayClient::new()), semaphore.clone());

// Exhaust the single permit, simulating one in-flight request.
let _held_permit = semaphore.clone().acquire_owned().await.unwrap();

let result = add_tx_inner(app_state, HeaderMap::new(), rpc_invoke_tx()).await;
assert_matches!(result, Err(HttpServerError::AtCapacityError()));
}

/// The `AtCapacityError` maps to HTTP 503, distinct from the `accept_new_txs == false` rejection.
#[tokio::test]
async fn at_capacity_error_maps_to_503() {
let response = HttpServerError::AtCapacityError().into_response();
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
}

/// A successful request releases its permit when the spawned task completes, so capacity is
/// restored for subsequent requests.
#[tokio::test]
async fn permit_released_after_successful_add_tx() {
let mut mock_gateway_client = MockGatewayClient::new();
mock_gateway_client.expect_add_tx().times(1).return_const(Ok(default_gateway_output()));
let semaphore = Arc::new(Semaphore::new(1));
let (_tx, app_state) =
app_state_with_semaphore(Arc::new(mock_gateway_client), semaphore.clone());

assert_eq!(semaphore.available_permits(), 1);
let result = add_tx_inner(app_state, HeaderMap::new(), rpc_invoke_tx()).await;
assert!(result.is_ok(), "{result:?}");
assert_eq!(semaphore.available_permits(), 1, "permit should be released after completion");
}

/// The `accept_new_txs == false` gate rejects before a permit is acquired: even with zero permits
/// available, the request is rejected as `DisabledError`, never reaching the spawn / `try_acquire`.
#[tokio::test]
async fn disabled_gate_runs_before_acquiring_permit() {
// Zero permits: if the gate did not short-circuit, we would observe `AtCapacityError`.
let semaphore = Arc::new(Semaphore::new(0));
let (tx, app_state) = app_state_with_semaphore(Arc::new(MockGatewayClient::new()), semaphore);
tx.send(HttpServerDynamicConfig { accept_new_txs: false, ..Default::default() }).unwrap();

let result = add_rpc_tx(Extension(app_state), HeaderMap::new(), Json(rpc_invoke_tx())).await;
assert_matches!(result, Err(HttpServerError::DisabledError()));
}

/// Cancel-safety: if the request future is dropped mid-flight (client disconnect), the detached
/// gateway task still runs to completion and continues to hold its permit until it finishes.
#[tokio::test]
async fn add_tx_inner_preserves_cancel_safety_on_disconnect() {
let started = Arc::new(Notify::new());
let release = Arc::new(Notify::new());
let (completed_tx, completed_rx) = oneshot::channel();
let gateway_client = Arc::new(BlockingGatewayClient {
started: started.clone(),
release: release.clone(),
completed: tokio::sync::Mutex::new(Some(completed_tx)),
});
let semaphore = Arc::new(Semaphore::new(1));
let (_tx, app_state) = app_state_with_semaphore(gateway_client, semaphore.clone());

let request_future = tokio::spawn(add_tx_inner(app_state, HeaderMap::new(), rpc_invoke_tx()));

// Wait until the detached gateway task is running; its permit is now held.
started.notified().await;
assert_eq!(semaphore.available_permits(), 0, "permit should be held while in-flight");

// Simulate a client disconnect: drop the request future.
request_future.abort();

// The detached task must still run to completion despite the disconnect.
release.notify_one();
completed_rx.await.expect("gateway task must complete after the request future was dropped");

// The permit is held for the inner task's full lifetime, then released.
tokio::time::timeout(Duration::from_secs(5), semaphore.acquire())
.await
.expect("permit should be released after the detached task completes")
.expect("semaphore should not be closed")
.forget();
}

#[tokio::test]
async fn zstd_decompressed_request_too_large() {
// 10 KB of repeated bytes — compresses to ~50 bytes with zstd.
Expand Down
Loading