Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/apollo_propeller/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ pub struct Config {
pub stream_protocol: StreamProtocol,
/// Maximum size of a message sent over the wire.
pub max_wire_message_size: usize,
/// Capacity of the bounded channel between each handler and the engine for inbound units.
/// Controls back-pressure: when the channel is full, the handler stops reading from the
/// network, causing yamux flow control to slow the remote peer.
pub inbound_channel_capacity: usize,
}

impl Default for Config {
Expand All @@ -23,6 +27,7 @@ impl Default for Config {
stale_message_timeout: Duration::from_secs(120),
stream_protocol: StreamProtocol::new("/propeller/0.1.0"),
max_wire_message_size: 1 << 20, // 1 MB
inbound_channel_capacity: 16,
}
}
}
24 changes: 24 additions & 0 deletions crates/apollo_propeller/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;

use apollo_infra_utils::warn_every_n_ms;
use futures::stream::SelectAll;
use futures::StreamExt;
use libp2p::identity::{Keypair, PeerId, PublicKey};
use lru::LruCache;
use starknet_api::staking::StakingWeight;
Expand Down Expand Up @@ -47,6 +50,10 @@ type BroadcastResult = (Result<Vec<PropellerUnit>, UnitPublishError>, BroadcastR
// or blocks legitimate messages. To prevent this we must make sure that message processors that
// never get a correct signature (with the right nonce) are not counted in the
// messages_to_ignore_units_from cache, and don't update the nonce tracked for each peer.

/// A stream of `(PeerId, PropellerUnit)` from a single connection handler's bounded channel.
type InboundUnitStream = Pin<Box<dyn futures::Stream<Item = (PeerId, PropellerUnit)> + Send>>;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct MessageKey {
committee_id: CommitteeId,
Expand Down Expand Up @@ -82,6 +89,10 @@ pub enum EngineCommand {
HandleDisconnected {
peer_id: PeerId,
},
RegisterHandler {
peer_id: PeerId,
receiver: futures::channel::mpsc::Receiver<PropellerUnit>,
},
}

/// Outputs sent from the engine to the Behaviour.
Expand Down Expand Up @@ -119,6 +130,9 @@ pub struct Engine {
prepared_units_tx: mpsc::UnboundedSender<BroadcastResult>,
from_behaviour_rx: mpsc::UnboundedReceiver<EngineCommand>,
to_behaviour_tx: mpsc::UnboundedSender<EngineOutput>,
/// Receivers for inbound units from connection handlers, polled via `SelectAll`.
/// Each receiver corresponds to one connection's bounded channel.
inbound_unit_receivers: SelectAll<InboundUnitStream>,
metrics: Option<PropellerMetrics>,
}

Expand Down Expand Up @@ -160,6 +174,7 @@ impl Engine {
prepared_units_tx: broadcaster_results_tx,
from_behaviour_rx,
to_behaviour_tx: output_tx,
inbound_unit_receivers: SelectAll::new(),
metrics,
}
}
Expand Down Expand Up @@ -579,8 +594,17 @@ impl Engine {
},
EngineCommand::HandleConnected { peer_id } => self.handle_connected(peer_id),
EngineCommand::HandleDisconnected { peer_id } => self.handle_disconnected(peer_id),
EngineCommand::RegisterHandler { peer_id, receiver } => {
let tagged_stream: InboundUnitStream =
Box::pin(receiver.map(move |unit| (peer_id, unit)));
self.inbound_unit_receivers.push(tagged_stream);
}
},

Some((peer_id, unit)) = self.inbound_unit_receivers.next() => {
self.handle_unit(peer_id, unit);
}

Some((result, response)) = self.prepared_units_rx.recv() => {
self.handle_broadcaster_result(result, response);
}
Expand Down
Loading