-
Notifications
You must be signed in to change notification settings - Fork 74
apollo_http_server: bound in-flight add_tx requests with a semaphore to shed flood load #14566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| use std::clone::Clone; | ||
| use std::net::SocketAddr; | ||
| use std::string::String; | ||
| use std::sync::Arc; | ||
| use std::time::Duration; | ||
|
|
||
| use apollo_config_manager_types::communication::SharedConfigManagerClient; | ||
|
|
@@ -36,6 +37,7 @@ use starknet_api::serde_utils::{ | |
| use starknet_api::transaction::fields::ValidResourceBounds; | ||
| use tokio::net::TcpListener; | ||
| use tokio::sync::watch::{channel, Receiver, Sender}; | ||
| use tokio::sync::Semaphore; | ||
| use tokio::time; | ||
| use tower_http::decompression::RequestDecompressionLayer; | ||
| use tower_http::limit::RequestBodyLimitLayer; | ||
|
|
@@ -62,6 +64,11 @@ pub type HttpServerResult<T> = Result<T, HttpServerError>; | |
|
|
||
| const CLIENT_REGION_HEADER: &str = "X-Client-Region"; | ||
|
|
||
| // Bounds the number of in-flight add_tx requests, each of which holds its payload and detached | ||
| // gateway task for the request's full lifetime. Aligned with the gateway component server's own | ||
| // concurrency bound. Excess requests are rejected with 503 rather than queued. | ||
| const DEFAULT_MAX_CONCURRENT_ADD_TX_REQUESTS: usize = 128; | ||
|
|
||
| pub struct HttpServer { | ||
| config: HttpServerConfig, | ||
| app_state: AppState, | ||
|
|
@@ -73,6 +80,10 @@ pub struct HttpServer { | |
| pub struct AppState { | ||
| gateway_client: SharedGatewayClient, | ||
| dynamic_config_rx: Receiver<HttpServerDynamicConfig>, | ||
| // Bounds the number of in-flight add_tx requests. A permit is acquired before the detached | ||
| // gateway task is spawned and held for that task's full lifetime, so the count reflects real | ||
| // in-flight work even when a client disconnects and the request future is dropped. | ||
| add_tx_semaphore: Arc<Semaphore>, | ||
| } | ||
|
|
||
| impl AppState { | ||
|
|
@@ -94,7 +105,8 @@ impl HttpServer { | |
| ) -> Self { | ||
| let (dynamic_config_tx, dynamic_config_rx) = | ||
| channel::<HttpServerDynamicConfig>(config.dynamic_config.clone()); | ||
| let app_state = AppState { gateway_client, dynamic_config_rx }; | ||
| let add_tx_semaphore = Arc::new(Semaphore::new(DEFAULT_MAX_CONCURRENT_ADD_TX_REQUESTS)); | ||
| let app_state = AppState { gateway_client, dynamic_config_rx, add_tx_semaphore }; | ||
| HttpServer { config, app_state, config_manager_client, dynamic_config_tx } | ||
| } | ||
|
|
||
|
|
@@ -299,16 +311,25 @@ async fn add_tx_inner( | |
| tx: RpcTransaction, | ||
| ) -> HttpServerResult<Json<GatewayOutput>> { | ||
| let gateway_input: GatewayInput = GatewayInput { rpc_tx: tx, message_metadata: None }; | ||
| // Bound concurrent in-flight requests: acquire the permit before spawning so excess requests | ||
| // are rejected with 503 rather than piling up as detached tasks under flood. | ||
| let permit = app_state | ||
| .add_tx_semaphore | ||
| .clone() | ||
| .try_acquire_owned() | ||
| .map_err(|_| HttpServerError::AtCapacityError())?; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the semaphore is exhausted, this returns Useful? React with 👍 / 👎.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be recorded to the failure metric
Comment on lines
+316
to
+320
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Because the permit is acquired only inside Useful? React with 👍 / 👎. |
||
| // Wrap the gateway client interaction with a tokio::spawn as it is NOT cancel-safe. | ||
| // Even if the current task is cancelled, e.g., when a request is dropped while still being | ||
| // processed, the inner task will continue to run. | ||
| // processed, the inner task will continue to run. The permit is moved into the spawned task so | ||
| // it is held for the task's full lifetime, decoupled from the (cancellable) request future. | ||
| let region = headers | ||
| .get(CLIENT_REGION_HEADER) | ||
| .and_then(|region| region.to_str().ok()) | ||
| .unwrap_or("N/A") | ||
| .to_string(); | ||
| let add_tx_result = tokio::spawn( | ||
| async move { | ||
| let _permit = permit; | ||
| let add_tx_result = app_state.gateway_client.add_tx(gateway_input).await.map_err(|e| { | ||
| debug!("Error while adding transaction: {}", e); | ||
| HttpServerError::from(Box::new(e)) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hard-codes the HTTP add_tx limit to the default gateway concurrency instead of the configured gateway server/client limit. In deployments where
components.gateway.max_concurrencyorcomponents.gateway.local_server_config.max_concurrencyis tuned away from 128, the HTTP server either sheds valid load too early or still sends more concurrent gateway requests than the gateway can process, defeating the stated alignment with the gateway component's own bound.Useful? React with 👍 / 👎.