diff --git a/crates/chain-orchestrator/Cargo.toml b/crates/chain-orchestrator/Cargo.toml index a2c51183..8452e97c 100644 --- a/crates/chain-orchestrator/Cargo.toml +++ b/crates/chain-orchestrator/Cargo.toml @@ -69,10 +69,12 @@ alloy-transport.workspace = true # rollup-node scroll-db = { workspace = true, features = ["test-utils"] } rollup-node-primitives = { workspace = true, features = ["arbitrary"] } +rollup-node-watcher = { workspace = true, features = ["test-utils"] } # scroll reth-scroll-chainspec.workspace = true reth-scroll-forks.workspace = true +reth-scroll-node = { workspace = true, features = ["test-utils"] } # reth reth-eth-wire-types.workspace = true diff --git a/crates/chain-orchestrator/src/error.rs b/crates/chain-orchestrator/src/error.rs index 504daaba..0056018e 100644 --- a/crates/chain-orchestrator/src/error.rs +++ b/crates/chain-orchestrator/src/error.rs @@ -36,10 +36,6 @@ pub enum ChainOrchestratorError { /// An L1 message was not found in the database. #[error("L1 message not found at {0}")] L1MessageNotFound(L1MessageKey), - /// A gap was detected in the L1 message queue: the previous message before index {0} is - /// missing. - #[error("L1 message queue gap detected at index {0}, previous L1 message not found")] - L1MessageQueueGap(u64), /// An inconsistency was detected when trying to consolidate the chain. #[error("Chain inconsistency detected")] ChainInconsistency, @@ -57,9 +53,6 @@ pub enum ChainOrchestratorError { /// The actual number of blocks. actual: usize, }, - /// A gap was detected in batch commit events: the previous batch before index {0} is missing. - #[error("Batch commit gap detected at index {0}, previous batch commit not found")] - BatchCommitGap(u64), /// An error occurred while making a network request. #[error("Network request error: {0}")] NetworkRequestError(#[from] reth_network_p2p::error::RequestError), @@ -92,6 +85,9 @@ pub enum ChainOrchestratorError { /// An error occurred while handling rollup node primitives. #[error("An error occurred while handling rollup node primitives: {0}")] RollupNodePrimitiveError(rollup_node_primitives::RollupNodePrimitiveError), + /// An error occurred during gap reset. + #[error("Gap reset error: {0}")] + GapResetError(String), } impl CanRetry for ChainOrchestratorError { diff --git a/crates/chain-orchestrator/src/event.rs b/crates/chain-orchestrator/src/event.rs index e1a487db..16f255ff 100644 --- a/crates/chain-orchestrator/src/event.rs +++ b/crates/chain-orchestrator/src/event.rs @@ -43,6 +43,15 @@ pub enum ChainOrchestratorEvent { /// The L1 block number in which the batch was committed. l1_block_number: u64, }, + /// A gap has been detected in the committed batches. + BatchCommitGap { + /// The missing batch index. + missing_index: u64, + /// The latest known L1 block number to reset to before the gap. + l1_block_number_reset: u64, + }, + /// A duplicate batch commit has been detected. + BatchCommitDuplicate(u64), /// A batch has been finalized returning a list of finalized batches. BatchFinalized { /// The L1 block info at which the batch finalization event was received. @@ -57,6 +66,15 @@ pub enum ChainOrchestratorEvent { /// The new safe head after the revert. safe_head: BlockInfo, }, + /// A gap has been detected in the reverted batches. + BatchRevertGap { + /// The missing batch index. + missing_index: u64, + /// The latest known L1 block number to reset to before the gap. + l1_block_number_reset: u64, + }, + /// A duplicate batch revert has been detected. + BatchRevertDuplicate(u64), /// A new L1 block has been received returning the L1 block number. NewL1Block(u64), /// An L1 block has been finalized returning the L1 block number and the list of finalized @@ -64,6 +82,15 @@ pub enum ChainOrchestratorEvent { L1BlockFinalized(u64, Vec), /// A `L1Message` event has been committed returning the message queue index. L1MessageCommitted(u64), + /// A gap has been detected in the L1 message queue. + L1MessageGap { + /// The missing L1 message queue index. + missing_index: u64, + /// The latest known L1 block number to reset to before the gap. + l1_block_number_reset: u64, + }, + /// A duplicate L1 message has been detected. + L1MessageDuplicate(u64), /// A reorg has occurred on L1, returning the L1 block number of the new L1 head, /// the L1 message queue index of the new L1 head, and optionally the L2 head and safe block /// info if the reorg resulted in a new L2 head or safe block. diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index be1a13f4..5b65a4af 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -11,6 +11,7 @@ use reth_network_api::{BlockDownloaderProvider, FullNetwork}; use reth_network_p2p::{sync::SyncState as RethSyncState, FullBlockClient}; use reth_scroll_node::ScrollNetworkPrimitives; use reth_scroll_primitives::ScrollBlock; + use reth_tasks::shutdown::Shutdown; use reth_tokio_util::{EventSender, EventStream}; use rollup_node_primitives::{ @@ -20,13 +21,13 @@ use rollup_node_primitives::{ use rollup_node_providers::L1MessageProvider; use rollup_node_sequencer::{Sequencer, SequencerEvent}; use rollup_node_signer::{SignatureAsBytes, SignerEvent, SignerHandle}; -use rollup_node_watcher::L1Notification; +use rollup_node_watcher::{L1Notification, L1WatcherHandle}; use scroll_alloy_consensus::{ScrollTxEnvelope, TxL1Message}; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_alloy_provider::ScrollEngineApi; use scroll_db::{ - Database, DatabaseError, DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, + Database, DatabaseError, DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, TXMut, UnwindResult, }; use scroll_derivation_pipeline::{BatchDerivationResult, DerivationPipeline}; @@ -35,7 +36,7 @@ use scroll_network::{ BlockImportOutcome, NewBlockWithPeer, ScrollNetwork, ScrollNetworkManagerEvent, }; use std::{collections::VecDeque, sync::Arc, time::Instant, vec}; -use tokio::sync::mpsc::{self, Receiver, UnboundedReceiver}; +use tokio::sync::mpsc::{self, UnboundedReceiver}; mod config; pub use config::ChainOrchestratorConfig; @@ -62,6 +63,10 @@ mod sync; pub use sync::{SyncMode, SyncState}; mod status; +use crate::ChainOrchestratorEvent::{ + BatchCommitDuplicate, BatchCommitGap, BatchRevertDuplicate, BatchRevertGap, L1MessageDuplicate, + L1MessageGap, +}; pub use status::ChainOrchestratorStatus; /// Wraps a future, metering the completion of it. @@ -115,8 +120,8 @@ pub struct ChainOrchestrator< database: Arc, /// The current sync state of the [`ChainOrchestrator`]. sync_state: SyncState, - /// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`]. - l1_notification_rx: Receiver>, + /// Handle to send commands to the L1 watcher (e.g., for gap recovery). + l1_watcher_handle: L1WatcherHandle, /// The network manager that manages the scroll p2p network. network: ScrollNetwork, /// The consensus algorithm used by the rollup node. @@ -150,7 +155,7 @@ impl< config: ChainOrchestratorConfig, block_client: Arc::Client>>, l2_provider: L2P, - l1_notification_rx: Receiver>, + l1_watcher_handle: L1WatcherHandle, network: ScrollNetwork, consensus: Box, engine: Engine, @@ -167,7 +172,7 @@ impl< database, config, sync_state: SyncState::default(), - l1_notification_rx, + l1_watcher_handle, network, consensus, engine, @@ -224,7 +229,7 @@ impl< let res = self.handle_network_event(event).await; self.handle_outcome(res); } - Some(notification) = self.l1_notification_rx.recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => { + Some(notification) = self.l1_watcher_handle.l1_notification_receiver().recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => { let res = self.handle_l1_notification(notification).await; self.handle_outcome(res); } @@ -754,8 +759,38 @@ impl< // Perform a consistency check to ensure the previous commit batch exists in the // database. - if tx.get_batch_by_index(prev_batch_index).await?.is_none() { - return Err(ChainOrchestratorError::BatchCommitGap(batch.index)); + if tx + .get_batch_by_index(prev_batch_index) + .await? + .iter() + .filter(|x| x.reverted_block_number.is_none()) + .count() == + 0 + { + // Query database for the L1 block of the last known batch + let reset_block = tx.get_last_batch_commit_l1_block().await?.unwrap_or(0); + + return Ok(Some(BatchCommitGap { + missing_index: batch_info.index, + l1_block_number_reset: reset_block, + })); + } + + // Check if batch already exists in DB. + for existing_batch in tx.get_batch_by_index(batch.index).await? { + if existing_batch.hash == batch.hash { + // This means we have already processed this batch commit, we will skip + // it. + return Ok(Some(BatchCommitDuplicate(existing_batch.index))); + } else if existing_batch.reverted_block_number.is_none() { + // This means we have received a different batch commit at the same + // index which has not been reverted yet. -> + // we missed a revert a event + return Ok(Some(BatchRevertGap { + missing_index: batch.index, + l1_block_number_reset: existing_batch.block_number, + })); + } } let event = ChainOrchestratorEvent::BatchCommitIndexed { @@ -774,8 +809,40 @@ impl< }) .await?; - if self.sync_state.is_synced() { - self.derivation_pipeline.push_batch(batch_info, BatchStatus::Consolidated).await; + match event { + Some(BatchCommitGap { missing_index, l1_block_number_reset }) => { + tracing::warn!( + target: "scroll::chain_orchestrator", + "Batch commit gap detected at index {}, last known batch at L1 block {}", + missing_index, + l1_block_number_reset + ); + self.l1_watcher_handle.trigger_gap_recovery(l1_block_number_reset).await; + } + Some(BatchRevertGap { missing_index, l1_block_number_reset }) => { + tracing::warn!( + target: "scroll::chain_orchestrator", + "Batch revert gap detected at index {}, last known batch at L1 block {}", + missing_index, + l1_block_number_reset + ); + self.l1_watcher_handle.trigger_gap_recovery(l1_block_number_reset).await; + } + Some(BatchCommitDuplicate(index)) => { + tracing::info!( + target: "scroll::chain_orchestrator", + "Duplicate batch commit detected at {:?}, skipping", + index + ); + } + Some(ChainOrchestratorEvent::BatchCommitIndexed { .. }) => { + if self.sync_state.is_synced() { + self.derivation_pipeline + .push_batch(batch_info, BatchStatus::Consolidated) + .await; + } + } + _ => {} } Ok(event) @@ -824,6 +891,33 @@ impl< end_index: u64, l1_block_info: BlockInfo, ) -> Result, ChainOrchestratorError> { + let event = self + .database + .tx(move |tx| async move { + // Check if we received this revert already. + // If any of the batches with same index is reverted with the same L1 block then the + // event is duplicate + for existing_batch in tx.get_batch_by_index(end_index).await? { + if existing_batch.reverted_block_number == Some(l1_block_info.number) { + return Ok::<_, ChainOrchestratorError>(Some(BatchRevertDuplicate( + existing_batch.index, + ))); + } + } + + Ok(None) + }) + .await?; + + if let Some(BatchRevertDuplicate(index)) = event { + tracing::info!( + target: "scroll::chain_orchestrator", + "Duplicate batch revert detected at {:?}, skipping", + index + ); + return Ok(event); + } + let (safe_block_info, batch_info) = self .database .tx_mut(move |tx| async move { @@ -841,57 +935,110 @@ impl< .await?; // Update the forkchoice state to the new safe block. - if self.sync_state.is_synced() { - tracing::info!(target: "scroll::chain_orchestrator", ?safe_block_info, "Updating safe head to block after batch revert"); - self.engine.update_fcs(None, Some(safe_block_info), None).await?; - } + self.engine.update_fcs(None, Some(safe_block_info), None).await?; Ok(Some(ChainOrchestratorEvent::BatchReverted { batch_info, safe_head: safe_block_info })) } /// Handles an L1 message by inserting it into the database. async fn handle_l1_message( - &self, + &mut self, l1_message: TxL1Message, l1_block_info: BlockInfo, ) -> Result, ChainOrchestratorError> { - let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); - let queue_hash = compute_l1_message_queue_hash( - &self.database, - &l1_message, - self.config.l1_v2_message_queue_start_index(), - ) - .await?; - let l1_message = L1MessageEnvelope::new(l1_message, l1_block_info.number, None, queue_hash); - - // Perform a consistency check to ensure the previous L1 message exists in the database. - self.database + let l1_v2_message_queue_start_index = self.config.l1_v2_message_queue_start_index(); + + let event = self.database .tx_mut(move |tx| { let l1_message = l1_message.clone(); + async move { - if l1_message.transaction.queue_index > 0 && + // check for gaps in the L1 message queue + if l1_message.queue_index > 0 && tx.get_n_l1_messages( Some(L1MessageKey::from_queue_index( - l1_message.transaction.queue_index - 1, + l1_message.queue_index - 1, + )), + 1, + ) + .await? + .is_empty() + { + // Query database for the L1 block of the last known L1 message + let reset_block = + tx.get_last_l1_message_l1_block().await?.unwrap_or(0); + + return Ok::<_, ChainOrchestratorError>(Some(L1MessageGap { missing_index: l1_message.queue_index, l1_block_number_reset: reset_block }) ); + } + + // check if the L1 message already exists in the DB + if let Some(existing_message) = tx + .get_n_l1_messages( + Some(L1MessageKey::from_queue_index( + l1_message.queue_index, )), 1, ) .await? - .is_empty() + .pop() { - return Err(ChainOrchestratorError::L1MessageQueueGap( - l1_message.transaction.queue_index, - )); + if existing_message.transaction.tx_hash() == + l1_message.tx_hash() + { + // We have already processed this L1 message, we will skip it. + return Ok(Some(L1MessageDuplicate(l1_message.queue_index))); + } + + // This should not happen in normal operation as messages should be + // deleted when a L1 reorg is handled, log warning. + tracing::warn!( + target: "scroll::chain_orchestrator", + "L1 message queue index {} already exists with different hash in DB {:?} vs {:?}", + l1_message.queue_index, + existing_message.transaction.tx_hash(), + l1_message.tx_hash() + ); } + // We compute the queue hash at the end as it requires the previous message. + let queue_hash = compute_l1_message_queue_hash( + &tx, + &l1_message, + l1_v2_message_queue_start_index, + ) + .await?; + + let l1_message = L1MessageEnvelope::new(l1_message, l1_block_info.number, None, queue_hash); + tx.insert_l1_message(l1_message.clone()).await?; tx.insert_l1_block_info(l1_block_info).await?; - Ok::<_, ChainOrchestratorError>(()) + + Ok(Some(ChainOrchestratorEvent::L1MessageCommitted(l1_message.transaction.queue_index))) } }) .await?; - Ok(Some(event)) + match event { + Some(L1MessageGap { missing_index, l1_block_number_reset }) => { + tracing::warn!( + target: "scroll::chain_orchestrator", + "L1 message queue gap detected at index {}, last known message at L1 block {}", + missing_index, + l1_block_number_reset + ); + self.l1_watcher_handle.trigger_gap_recovery(l1_block_number_reset).await; + } + Some(L1MessageDuplicate(index)) => { + tracing::info!( + target: "scroll::chain_orchestrator", + "Duplicate L1 message detected at {:?}, skipping", + index + ); + } + _ => {} + } + + Ok(event) } async fn handle_network_event( @@ -1346,7 +1493,7 @@ impl< /// /// The solidity contract (`L1MessageQueueV2.sol`) implementation is defined here: async fn compute_l1_message_queue_hash( - database: &Arc, + tx: &Arc, l1_message: &TxL1Message, l1_v2_message_queue_start_index: u64, ) -> Result>, ChainOrchestratorError> { @@ -1356,7 +1503,7 @@ async fn compute_l1_message_queue_hash( Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) } else if l1_message.queue_index > l1_v2_message_queue_start_index { let index = l1_message.queue_index - 1; - let mut input = database + let mut input = tx .get_n_l1_messages(Some(L1MessageKey::from_queue_index(index)), 1) .await? .first() diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index 999e4b28..30209dd3 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -537,7 +537,7 @@ impl DatabaseReadOperations for Database { async fn get_batch_by_index( &self, batch_index: u64, - ) -> Result, DatabaseError> { + ) -> Result, DatabaseError> { metered!( DatabaseOperation::GetBatchByIndex, self, @@ -616,6 +616,22 @@ impl DatabaseReadOperations for Database { ) } + async fn get_last_batch_commit_l1_block(&self) -> Result, DatabaseError> { + metered!( + DatabaseOperation::GetLastBatchCommitL1Block, + self, + tx(|tx| async move { tx.get_last_batch_commit_l1_block().await }) + ) + } + + async fn get_last_l1_message_l1_block(&self) -> Result, DatabaseError> { + metered!( + DatabaseOperation::GetLastL1MessageL1Block, + self, + tx(|tx| async move { tx.get_last_l1_message_l1_block().await }) + ) + } + async fn get_n_l1_messages( &self, start: Option, @@ -879,8 +895,7 @@ mod test { // Round trip the BatchCommitData through the database. db.insert_batch(batch_commit.clone()).await.unwrap(); - let batch_commit_from_db = - db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap(); + let batch_commit_from_db = db.get_batch_by_hash(batch_commit.hash).await.unwrap().unwrap(); assert_eq!(batch_commit, batch_commit_from_db); } @@ -1400,7 +1415,7 @@ mod test { // Insert L2 blocks with different batch indices for i in 100..110 { - let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap(); + let batch_data = db.get_batch_by_index(i).await.unwrap().first().unwrap().clone(); let batch_info: BatchInfo = batch_data.into(); let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() }; @@ -1577,9 +1592,9 @@ mod test { db.set_finalized_l1_block_number(21).await.unwrap(); // Verify the batches and blocks were inserted correctly - let retrieved_batch_1 = db.get_batch_by_index(1).await.unwrap().unwrap(); - let retrieved_batch_2 = db.get_batch_by_index(2).await.unwrap().unwrap(); - let retrieved_batch_3 = db.get_batch_by_index(3).await.unwrap().unwrap(); + let retrieved_batch_1 = db.get_batch_by_index(1).await.unwrap().first().unwrap().clone(); + let retrieved_batch_2 = db.get_batch_by_index(2).await.unwrap().first().unwrap().clone(); + let retrieved_batch_3 = db.get_batch_by_index(3).await.unwrap().first().unwrap().clone(); let retried_block_1 = db.get_l2_block_info_by_number(1).await.unwrap().unwrap(); let retried_block_2 = db.get_l2_block_info_by_number(2).await.unwrap().unwrap(); let retried_block_3 = db.get_l2_block_info_by_number(3).await.unwrap().unwrap(); diff --git a/crates/database/db/src/metrics.rs b/crates/database/db/src/metrics.rs index 2240d098..3d2e6947 100644 --- a/crates/database/db/src/metrics.rs +++ b/crates/database/db/src/metrics.rs @@ -62,6 +62,8 @@ pub(crate) enum DatabaseOperation { GetFinalizedL1BlockNumber, GetProcessedL1BlockNumber, GetL2HeadBlockNumber, + GetLastBatchCommitL1Block, + GetLastL1MessageL1Block, GetNL1Messages, GetNL2BlockDataHint, GetL2BlockAndBatchInfoByHash, @@ -130,6 +132,8 @@ impl DatabaseOperation { Self::GetFinalizedL1BlockNumber => "get_finalized_l1_block_number", Self::GetProcessedL1BlockNumber => "get_processed_l1_block_number", Self::GetL2HeadBlockNumber => "get_l2_head_block_number", + Self::GetLastBatchCommitL1Block => "get_last_batch_commit_l1_block", + Self::GetLastL1MessageL1Block => "get_last_l1_message_l1_block", Self::GetNL1Messages => "get_n_l1_messages", Self::GetNL2BlockDataHint => "get_n_l2_block_data_hint", Self::GetL2BlockAndBatchInfoByHash => "get_l2_block_and_batch_info_by_hash", diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 8d75d8fd..82b1a9a1 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -953,7 +953,7 @@ pub trait DatabaseReadOperations { async fn get_batch_by_index( &self, batch_index: u64, - ) -> Result, DatabaseError>; + ) -> Result, DatabaseError>; /// Get a [`BatchCommitData`] from the database by its batch hash. async fn get_batch_by_hash( @@ -986,6 +986,12 @@ pub trait DatabaseReadOperations { /// Get the latest L2 head block info. async fn get_l2_head_block_number(&self) -> Result; + /// Get the L1 block number of the last batch commit in the database. + async fn get_last_batch_commit_l1_block(&self) -> Result, DatabaseError>; + + /// Get the L1 block number of the last L1 message in the database. + async fn get_last_l1_message_l1_block(&self) -> Result, DatabaseError>; + /// Get a vector of n [`L1MessageEnvelope`]s in the database starting from the provided `start` /// point. async fn get_n_l1_messages( @@ -1042,18 +1048,16 @@ impl DatabaseReadOperations for T { async fn get_batch_by_index( &self, batch_index: u64, - ) -> Result, DatabaseError> { + ) -> Result, DatabaseError> { Ok(models::batch_commit::Entity::find() .filter( models::batch_commit::Column::Index - .eq(TryInto::::try_into(batch_index).expect("index should fit in i64")) - .and(models::batch_commit::Column::RevertedBlockNumber.is_null()), + .eq(TryInto::::try_into(batch_index).expect("index should fit in i64")), ) - .one(self.get_connection()) + .all(self.get_connection()) .await - .map(|x| x.map(Into::into))?) + .map(|x| x.into_iter().map(Into::into).collect())?) } - async fn get_batch_by_hash( &self, batch_hash: B256, @@ -1181,6 +1185,29 @@ impl DatabaseReadOperations for T { .expect("l2_head_block should always be a valid u64")) } + async fn get_last_batch_commit_l1_block(&self) -> Result, DatabaseError> { + Ok(models::batch_commit::Entity::find() + .filter(models::batch_commit::Column::RevertedBlockNumber.is_null()) + .order_by_desc(models::batch_commit::Column::BlockNumber) + .select_only() + .column(models::batch_commit::Column::BlockNumber) + .into_tuple::() + .one(self.get_connection()) + .await? + .map(|block_number| block_number as u64)) + } + + async fn get_last_l1_message_l1_block(&self) -> Result, DatabaseError> { + Ok(models::l1_message::Entity::find() + .order_by_desc(models::l1_message::Column::L1BlockNumber) + .select_only() + .column(models::l1_message::Column::L1BlockNumber) + .into_tuple::() + .one(self.get_connection()) + .await? + .map(|block_number| block_number as u64)) + } + async fn get_n_l1_messages( &self, start: Option, diff --git a/crates/derivation-pipeline/benches/pipeline.rs b/crates/derivation-pipeline/benches/pipeline.rs index 048af0d5..6c01f4ff 100644 --- a/crates/derivation-pipeline/benches/pipeline.rs +++ b/crates/derivation-pipeline/benches/pipeline.rs @@ -67,13 +67,14 @@ type L1ProviderFactory

= /// Returns a pipeline with a provider initiated from the factory function. async fn setup_pipeline( factory: L1ProviderFactory

, -) -> DerivationPipeline { +) -> (DerivationPipeline, Vec) { // load batch data in the db. let db = Arc::new(setup_test_db().await); let blob_hashes: Vec = serde_json::from_str( &std::fs::read_to_string("./benches/testdata/batch_info.json").unwrap(), ) .unwrap(); + let mut batches = vec![]; for (index, hash) in (BATCHES_START_INDEX..=BATCHES_STOP_INDEX).zip(blob_hashes.into_iter()) { let raw_calldata = @@ -89,7 +90,8 @@ async fn setup_pipeline( finalized_block_number: None, reverted_block_number: None, }; - db.insert_batch(batch_data).await.unwrap(); + db.insert_batch(batch_data.clone()).await.unwrap(); + batches.push(batch_data); } // load messages in db. @@ -108,7 +110,7 @@ async fn setup_pipeline( // construct the pipeline. let l1_provider = factory(db.clone()).await; - DerivationPipeline::new(l1_provider, db, u64::MAX).await + (DerivationPipeline::new(l1_provider, db, u64::MAX).await, batches) } /// Benchmark the derivation pipeline with blobs fetched from file. This does not bench the network @@ -122,11 +124,12 @@ fn benchmark_pipeline_derivation_in_file_blobs(c: &mut Criterion) { let (tx, rx) = std::sync::mpsc::channel(); Handle::current().spawn(async move { // setup (not measured): create fresh pipeline with 253 committed batches - let mut pipeline = setup_pipeline(Box::new(setup_mock_provider)).await; + let (mut pipeline, batches) = + setup_pipeline(Box::new(setup_mock_provider)).await; // commit 253 batches. - for index in BATCHES_START_INDEX..=BATCHES_STOP_INDEX { - let batch_info = BatchInfo { index, hash: Default::default() }; + for batch in batches { + let batch_info = BatchInfo { index: batch.index, hash: batch.hash }; pipeline.push_batch(batch_info, BatchStatus::Committed).await; } @@ -158,11 +161,12 @@ fn benchmark_pipeline_derivation_s3_blobs(c: &mut Criterion) { let (tx, rx) = std::sync::mpsc::channel(); Handle::current().spawn(async move { // setup (not measured): create fresh pipeline with 15 committed batches - let mut pipeline = setup_pipeline(Box::new(setup_full_provider)).await; + let (mut pipeline, batches) = + setup_pipeline(Box::new(setup_full_provider)).await; // commit 15 batches. - for index in BATCHES_START_INDEX..=BATCHES_START_INDEX + 15 { - let batch_info = BatchInfo { index, hash: Default::default() }; + for batch in batches { + let batch_info = BatchInfo { index: batch.index, hash: batch.hash }; pipeline.push_batch(batch_info, BatchStatus::Committed).await; } diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 4bd71bf0..afb615dd 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -218,7 +218,7 @@ where // get the batch commit data. let batch = db - .get_batch_by_index(batch_info.index) + .get_batch_by_hash(batch_info.hash) .await .map_err(|err| (request.clone(), err.into()))? .ok_or(( @@ -450,6 +450,7 @@ mod tests { use alloy_eips::Decodable2718; use alloy_primitives::{address, b256, bytes, U256}; + use eyre::bail; use futures::StreamExt; use rollup_node_primitives::L1MessageEnvelope; use rollup_node_providers::{test_utils::MockL1Provider, L1ProviderError}; @@ -510,7 +511,7 @@ mod tests { finalized_block_number: None, reverted_block_number: None, }; - db.insert_batch(batch_data).await?; + db.insert_batch(batch_data.clone()).await?; // load message in db, leaving a l1 message missing. db.insert_l1_message(L1_MESSAGE_INDEX_33).await?; @@ -521,10 +522,7 @@ mod tests { // as long as we don't call `push_batch`, pipeline should not return attributes. pipeline - .push_batch( - BatchInfo { index: 12, hash: Default::default() }, - BatchStatus::Consolidated, - ) + .push_batch(BatchInfo { index: 12, hash: batch_data.hash }, BatchStatus::Consolidated) .await; // wait for 5 seconds to ensure the pipeline is in a retry loop. @@ -541,14 +539,22 @@ mod tests { // check the correctness of the last attribute. let mut attribute = ScrollPayloadAttributes::default(); - if let Some(BatchDerivationResult { attributes, .. }) = pipeline.next().await { - for a in attributes { - if a.attributes.payload_attributes.timestamp == 1696935657 { - attribute = a.attributes; - break; + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(5000)) => { + bail!("pipeline did not yield in time"); + } + maybe_result = pipeline.next() => { + if let Some(BatchDerivationResult { attributes, .. }) = maybe_result { + for a in attributes { + if a.attributes.payload_attributes.timestamp == 1696935657 { + attribute = a.attributes; + break; + } + } } } } + let expected = ScrollPayloadAttributes { payload_attributes: PayloadAttributes { timestamp: 1696935657, @@ -582,7 +588,7 @@ mod tests { finalized_block_number: None, reverted_block_number: None, }; - db.insert_batch(batch_data).await?; + db.insert_batch(batch_data.clone()).await?; // load messages in db. let l1_messages = vec![L1_MESSAGE_INDEX_33, L1_MESSAGE_INDEX_34]; for message in l1_messages { @@ -595,7 +601,7 @@ mod tests { // as long as we don't call `push_batch`, pipeline should not return attributes. pipeline - .push_batch(BatchInfo { index: 12, hash: Default::default() }, BatchStatus::Committed) + .push_batch(BatchInfo { index: 12, hash: batch_data.hash }, BatchStatus::Committed) .await; // check the correctness of the last attribute. diff --git a/crates/node/src/add_ons/handle.rs b/crates/node/src/add_ons/handle.rs index a293e6b5..1948e865 100644 --- a/crates/node/src/add_ons/handle.rs +++ b/crates/node/src/add_ons/handle.rs @@ -5,6 +5,10 @@ use reth_rpc_eth_api::EthApiTypes; use reth_scroll_node::ScrollNetworkPrimitives; use rollup_node_chain_orchestrator::ChainOrchestratorHandle; #[cfg(feature = "test-utils")] +use tokio::sync::mpsc::UnboundedReceiver; +#[cfg(feature = "test-utils")] +use tokio::sync::Mutex; +#[cfg(feature = "test-utils")] use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender}; /// A handle for scroll addons, which includes handles for the rollup manager and RPC server. @@ -20,6 +24,10 @@ pub struct ScrollAddOnsHandle< /// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`. #[cfg(feature = "test-utils")] pub l1_watcher_tx: Option>>, + /// An optional channel used to receive commands from the `RollupNodeManager` to the + /// `L1Watcher`. + #[cfg(feature = "test-utils")] + pub l1_watcher_command_rx: Arc>>, } impl< diff --git a/crates/node/src/add_ons/mod.rs b/crates/node/src/add_ons/mod.rs index 4dbb797d..2b4ba0c5 100644 --- a/crates/node/src/add_ons/mod.rs +++ b/crates/node/src/add_ons/mod.rs @@ -2,6 +2,7 @@ use super::args::ScrollRollupNodeConfig; use crate::constants; +use std::sync::Arc; use reth_evm::{ConfigureEngineEvm, EvmFactory, EvmFactoryFor}; use reth_network::NetworkProtocols; @@ -37,7 +38,7 @@ pub use rpc::{RollupNodeExtApiClient, RollupNodeExtApiServer, RollupNodeRpcExt}; mod rollup; pub use rollup::IsDevChain; use rollup::RollupManagerAddOn; -use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::{mpsc::UnboundedReceiver, Mutex}; /// Add-ons for the Scroll follower node. #[derive(Debug)] @@ -137,7 +138,7 @@ where } let rpc_handle = rpc_add_ons.launch_add_ons_with(ctx.clone(), |_| Ok(())).await?; - let (rollup_manager_handle, l1_watcher_tx) = + let (rollup_manager_handle, l1_watcher_tx, l1_watcher_command_rx) = rollup_node_manager_addon.launch(ctx.clone(), rpc_handle.clone()).await?; tx.send(rollup_manager_handle.clone()) @@ -148,6 +149,10 @@ where rpc_handle, #[cfg(feature = "test-utils")] l1_watcher_tx, + #[cfg(feature = "test-utils")] + l1_watcher_command_rx: Arc::new(Mutex::new( + l1_watcher_command_rx.expect("l1_watcher_command_rx must exist in test utils"), + )), }) } } diff --git a/crates/node/src/add_ons/rollup.rs b/crates/node/src/add_ons/rollup.rs index 93e88461..61566218 100644 --- a/crates/node/src/add_ons/rollup.rs +++ b/crates/node/src/add_ons/rollup.rs @@ -9,7 +9,7 @@ use reth_rpc_eth_api::EthApiTypes; use reth_scroll_chainspec::{ChainConfig, ScrollChainConfig, ScrollChainSpec}; use reth_scroll_node::ScrollNetworkPrimitives; use rollup_node_chain_orchestrator::ChainOrchestratorHandle; -use rollup_node_watcher::L1Notification; +use rollup_node_watcher::{L1Notification, L1WatcherCommand}; use scroll_alloy_hardforks::ScrollHardforks; use scroll_wire::ScrollWireEvent; use std::sync::Arc; @@ -55,13 +55,17 @@ impl RollupManagerAddOn { self, ctx: AddOnsContext<'_, N>, rpc: RpcHandle, - ) -> eyre::Result<(ChainOrchestratorHandle, Option>>)> + ) -> eyre::Result<( + ChainOrchestratorHandle, + Option>>, + Option>, + )> where <::Types as NodeTypes>::ChainSpec: ChainConfig + ScrollHardforks + IsDevChain, N::Network: NetworkProtocols + FullNetwork, { - let (chain_orchestrator, handle, l1_notification_tx) = self + let (chain_orchestrator, handle, l1_notification_tx, l1_watcher_command_rx) = self .config .build((&ctx).into(), self.scroll_wire_event, rpc.rpc_server_handles) .await?; @@ -70,6 +74,6 @@ impl RollupManagerAddOn { .spawn_critical_with_shutdown_signal("rollup_node_manager", |shutdown| { chain_orchestrator.run_until_shutdown(shutdown) }); - Ok((handle, l1_notification_tx)) + Ok((handle, l1_notification_tx, l1_watcher_command_rx)) } } diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 7fca258b..eefaf7ef 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -38,7 +38,7 @@ use rollup_node_providers::{ use rollup_node_sequencer::{ L1MessageInclusionMode, PayloadBuildingConfig, Sequencer, SequencerConfig, }; -use rollup_node_watcher::{L1Notification, L1Watcher}; +use rollup_node_watcher::{L1Notification, L1Watcher, L1WatcherCommand, L1WatcherHandle}; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi}; @@ -167,6 +167,7 @@ impl ScrollRollupNodeConfig { >, ChainOrchestratorHandle, Option>>, + Option>, )> where N: FullNetwork + NetworkProtocols, @@ -340,35 +341,39 @@ impl ScrollRollupNodeConfig { }; let consensus = self.consensus_args.consensus(authorized_signer)?; - let (l1_notification_tx, l1_notification_rx): (Option>>, _) = - if let Some(provider) = l1_provider.filter(|_| !self.test) { - tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher"); - ( - None, - Some( - L1Watcher::spawn( - provider, - l1_block_startup_info, - node_config, - self.l1_provider_args.logs_query_block_range, - ) - .await, - ), - ) - } else { - // Create a channel for L1 notifications that we can use to inject L1 messages for - // testing - #[cfg(feature = "test-utils")] - { - let (tx, rx) = tokio::sync::mpsc::channel(1000); - (Some(tx), Some(rx)) - } - - #[cfg(not(feature = "test-utils"))] - { - (None, None) - } - }; + #[allow(clippy::type_complexity)] + let (l1_notification_tx, l1_watcher_handle, l1_watcher_command_rx): ( + Option>>, + Option, + Option>, + ) = if let Some(provider) = l1_provider.filter(|_| !self.test) { + tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher"); + let handle = L1Watcher::spawn( + provider, + l1_block_startup_info, + node_config, + self.l1_provider_args.logs_query_block_range, + ) + .await; + (None, Some(handle), None) + } else { + // Create a channel for L1 notifications that we can use to inject L1 messages for + // testing + #[cfg(feature = "test-utils")] + { + let (tx, rx) = tokio::sync::mpsc::channel(1000); + + let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = L1WatcherHandle::new(command_tx, rx); + + (Some(tx), Some(handle), Some(command_rx)) + } + + #[cfg(not(feature = "test-utils"))] + { + (None, None, None) + } + }; // Construct the l1 provider. let l1_messages_provider = db.clone(); @@ -447,7 +452,7 @@ impl ScrollRollupNodeConfig { config, Arc::new(block_client), l2_provider, - l1_notification_rx.expect("L1 notification receiver should be set"), + l1_watcher_handle.expect("L1 watcher handle should be set"), scroll_network_handle.into_scroll_network().await, consensus, engine, @@ -457,7 +462,7 @@ impl ScrollRollupNodeConfig { ) .await?; - Ok((chain_orchestrator, handle, l1_notification_tx)) + Ok((chain_orchestrator, handle, l1_notification_tx, l1_watcher_command_rx)) } } diff --git a/crates/node/src/test_utils/event_utils.rs b/crates/node/src/test_utils/event_utils.rs index 9d7b7da1..e572fc93 100644 --- a/crates/node/src/test_utils/event_utils.rs +++ b/crates/node/src/test_utils/event_utils.rs @@ -112,6 +112,39 @@ impl<'a> EventWaiter<'a> { Ok(()) } + /// Wait for L1 message duplicate event on all specified nodes. + pub async fn l1_message_duplicate(self, expected_queue_index: u64) -> eyre::Result<()> { + self.wait_for_event_on_all(|e| { + if let ChainOrchestratorEvent::L1MessageDuplicate(queue_index) = e { + (*queue_index == expected_queue_index).then_some(()) + } else { + None + } + }) + .await?; + Ok(()) + } + + /// Wait for L1 message gap event on all specified nodes. + pub async fn l1_message_gap( + self, + expected_missing_index: u64, + expected_l1_block_number_reset: u64, + ) -> eyre::Result<()> { + self.wait_for_event_on_all(|e| { + if let ChainOrchestratorEvent::L1MessageGap { missing_index, l1_block_number_reset } = e + { + (*missing_index == expected_missing_index && + *l1_block_number_reset == expected_l1_block_number_reset) + .then_some(()) + } else { + None + } + }) + .await?; + Ok(()) + } + /// Wait for L1 reorg event to be received by all. pub async fn l1_reorg(self) -> eyre::Result<()> { self.wait_for_event_on_all(|e| { @@ -143,6 +176,93 @@ impl<'a> EventWaiter<'a> { Ok(()) } + /// Wait for batch commit indexed event on all specified nodes. + pub async fn batch_commit_indexed( + self, + target_batch_index: u64, + target_l1_block_number: u64, + ) -> eyre::Result<()> { + self.wait_for_event_on_all(|e| { + if let ChainOrchestratorEvent::BatchCommitIndexed { batch_info, l1_block_number } = e { + (batch_info.index == target_batch_index && + *l1_block_number == target_l1_block_number) + .then_some(()) + } else { + None + } + }) + .await?; + Ok(()) + } + + /// Wait for batch commit duplicate event on all specified nodes. + pub async fn batch_commit_duplicates(self, target_batch_index: u64) -> eyre::Result<()> { + self.wait_for_event_on_all(|e| { + if let ChainOrchestratorEvent::BatchCommitDuplicate(batch_index) = e { + (*batch_index == target_batch_index).then_some(()) + } else { + None + } + }) + .await?; + Ok(()) + } + + /// Wait for batch commit gap event on all specified nodes. + pub async fn batch_commit_gap( + self, + expected_missing_index: u64, + expected_l1_block_number_reset: u64, + ) -> eyre::Result<()> { + self.wait_for_event_on_all(|e| { + if let ChainOrchestratorEvent::BatchCommitGap { missing_index, l1_block_number_reset } = + e + { + (*missing_index == expected_missing_index && + *l1_block_number_reset == expected_l1_block_number_reset) + .then_some(()) + } else { + None + } + }) + .await?; + Ok(()) + } + + /// Wait for batch revert duplicate event on all specified nodes. + pub async fn batch_revert_duplicate(self, expected_index: u64) -> eyre::Result<()> { + self.wait_for_event_on_all(|e| { + if let ChainOrchestratorEvent::BatchRevertDuplicate(index) = e { + (*index == expected_index).then_some(()) + } else { + None + } + }) + .await?; + Ok(()) + } + + /// Wait for batch revert gap event on all specified nodes. + pub async fn batch_revert_gap( + self, + expected_missing_index: u64, + expected_l1_block_number_reset: u64, + ) -> eyre::Result<()> { + self.wait_for_event_on_all(|e| { + if let ChainOrchestratorEvent::BatchRevertGap { missing_index, l1_block_number_reset } = + e + { + (*missing_index == expected_missing_index && + *l1_block_number_reset == expected_l1_block_number_reset) + .then_some(()) + } else { + None + } + }) + .await?; + Ok(()) + } + /// Wait for batch reverted event on all specified nodes. pub async fn batch_reverted(self) -> eyre::Result<()> { self.wait_for_event_on_all(|e| { diff --git a/crates/node/src/test_utils/fixture.rs b/crates/node/src/test_utils/fixture.rs index 2ae50eb9..255e2ad9 100644 --- a/crates/node/src/test_utils/fixture.rs +++ b/crates/node/src/test_utils/fixture.rs @@ -27,7 +27,7 @@ use reth_tokio_util::EventStream; use rollup_node_chain_orchestrator::{ChainOrchestratorEvent, ChainOrchestratorHandle}; use rollup_node_primitives::BlockInfo; use rollup_node_sequencer::L1MessageInclusionMode; -use rollup_node_watcher::L1Notification; +use rollup_node_watcher::{L1Notification, L1WatcherCommand}; use scroll_alloy_consensus::ScrollPooledTransaction; use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi}; use scroll_alloy_rpc_types::Transaction; @@ -36,8 +36,9 @@ use std::{ fmt::{Debug, Formatter}, path::PathBuf, sync::Arc, + time::Duration, }; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, mpsc::UnboundedReceiver, Mutex}; /// Main test fixture providing a high-level interface for testing rollup nodes. #[derive(Debug)] @@ -77,6 +78,8 @@ pub struct NodeHandle { pub engine: Engine>, /// L1 watcher notification channel. pub l1_watcher_tx: Option>>, + /// L1 watcher command receiver. + pub l1_watcher_command_rx: Arc>>, /// Chain orchestrator listener. pub chain_orchestrator_rx: EventStream, /// Chain orchestrator handle. @@ -95,6 +98,14 @@ impl NodeHandle { pub const fn is_follower(&self) -> bool { matches!(self.typ, NodeType::Follower) } + + /// Update the L1 watcher notification sender. + /// + /// This is used after gap recovery to update the sender to the new channel + /// that the `ChainOrchestrator` is now listening on. + pub fn set_l1_watcher_tx(&mut self, tx: mpsc::Sender>) { + self.l1_watcher_tx = Some(tx); + } } impl Debug for NodeHandle { @@ -201,6 +212,62 @@ impl TestFixture { ) -> eyre::Result { self.get_status(0).await } + + /// Wait for an L1 watcher command on the sequencer node. + /// + /// Returns the received command or an error if timeout is reached. + pub async fn expect_l1_watcher_command(&self) -> eyre::Result { + self.expect_l1_watcher_command_on(0).await + } + + /// Wait for an L1 watcher command on a specific node. + /// + /// Returns the received command or an error if timeout is reached. + pub async fn expect_l1_watcher_command_on( + &self, + node_index: usize, + ) -> eyre::Result { + let mut command_rx = self.nodes[node_index].l1_watcher_command_rx.lock().await; + tokio::time::timeout(Duration::from_secs(5), command_rx.recv()) + .await + .map_err(|_| eyre::eyre!("Timeout waiting for L1 watcher command"))? + .ok_or_else(|| eyre::eyre!("L1 watcher command channel closed")) + } + + /// Process an L1 watcher reset command and update the notification sender. + /// + /// After a gap is detected, the `ChainOrchestrator` calls `trigger_gap_recovery` which + /// creates a new notification channel and sends the new sender via the command channel. + /// This method reads that command and updates the test fixture's `l1_watcher_tx` so + /// subsequent L1 notifications can be sent to the `ChainOrchestrator`. + /// + /// Returns the block number that the L1 watcher should reset to. + pub async fn process_gap_recovery_command(&mut self) -> eyre::Result { + self.process_gap_recovery_command_on(0).await + } + + /// Process an L1 watcher reset command on a specific node. + /// + /// See [`Self::process_gap_recovery_command`] for details. + pub async fn process_gap_recovery_command_on( + &mut self, + node_index: usize, + ) -> eyre::Result { + let command = { + let mut command_rx = self.nodes[node_index].l1_watcher_command_rx.lock().await; + tokio::time::timeout(Duration::from_secs(5), command_rx.recv()) + .await + .map_err(|_| eyre::eyre!("Timeout waiting for L1 watcher command"))? + .ok_or_else(|| eyre::eyre!("L1 watcher command channel closed"))? + }; + + match command { + L1WatcherCommand::ResetToBlock { block, new_sender } => { + self.nodes[node_index].set_l1_watcher_tx(new_sender); + Ok(block) + } + } + } } /// Builder for creating test fixtures with a fluent API. @@ -452,6 +519,7 @@ impl TestFixtureBuilder { // Get handles if available let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone(); + let l1_watcher_command_rx = node.inner.add_ons_handle.l1_watcher_command_rx.clone(); let rollup_manager_handle = node.inner.add_ons_handle.rollup_manager_handle.clone(); let chain_orchestrator_rx = node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; @@ -461,6 +529,7 @@ impl TestFixtureBuilder { engine, chain_orchestrator_rx, l1_watcher_tx, + l1_watcher_command_rx, rollup_manager_handle, typ: if config.sequencer_args.sequencer_enabled && index == 0 { NodeType::Sequencer diff --git a/crates/node/src/test_utils/l1_helpers.rs b/crates/node/src/test_utils/l1_helpers.rs index d6c858cf..9fac2af0 100644 --- a/crates/node/src/test_utils/l1_helpers.rs +++ b/crates/node/src/test_utils/l1_helpers.rs @@ -96,6 +96,11 @@ impl<'a> L1Helper<'a> { BatchRevertBuilder::new(self) } + /// Create a batch revert range builder. + pub fn revert_batch_range(self) -> BatchRevertRangeBuilder<'a> { + BatchRevertRangeBuilder::new(self) + } + /// Send an L1 finalized block notification. pub async fn finalize_l1_block(self, block_number: u64) -> eyre::Result<()> { let notification = Arc::new(L1Notification::Finalized(block_number)); @@ -453,3 +458,58 @@ impl<'a> BatchRevertBuilder<'a> { self.l1_helper.send_to_nodes(notification).await } } + +/// Builder for creating batch revert range notifications in tests. +#[derive(Debug)] +pub struct BatchRevertRangeBuilder<'a> { + l1_helper: L1Helper<'a>, + block_info: BlockInfo, + start: u64, + end: u64, +} + +impl<'a> BatchRevertRangeBuilder<'a> { + fn new(l1_helper: L1Helper<'a>) -> Self { + Self { + l1_helper, + block_info: BlockInfo { number: 0, hash: B256::random() }, + start: 0, + end: 0, + } + } + + /// Set the L1 block info for this batch revert range. + pub const fn at_block(mut self, block_info: BlockInfo) -> Self { + self.block_info = block_info; + self + } + + /// Set the L1 block number for this batch revert range. + pub const fn at_block_number(mut self, block_number: u64) -> Self { + self.block_info.number = block_number; + self + } + + /// Set the start batch index. + pub const fn start(mut self, start: u64) -> Self { + self.start = start; + self + } + + /// Set the end batch index. + pub const fn end(mut self, end: u64) -> Self { + self.end = end; + self + } + + /// Send the batch revert range notification. + pub async fn send(self) -> eyre::Result<()> { + let notification = Arc::new(L1Notification::BatchRevertRange { + start: self.start, + end: self.end, + block_info: self.block_info, + }); + + self.l1_helper.send_to_nodes(notification).await + } +} diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index fc9dba89..537db55c 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -4,6 +4,7 @@ use alloy_eips::BlockNumberOrTag; use alloy_primitives::{address, b256, Address, Bytes, Signature, B256, U256}; use alloy_signer::Signer; use alloy_signer_local::PrivateKeySigner; +use eyre::{bail, Ok}; use futures::{task::noop_waker_ref, FutureExt, StreamExt}; use reth_chainspec::EthChainSpec; use reth_network::{NetworkConfigBuilder, NetworkEventListenerProvider, PeersInfo}; @@ -38,7 +39,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use tokio::{sync::Mutex, time}; +use tokio::{select, sync::Mutex, time}; use tracing::trace; #[tokio::test] @@ -574,7 +575,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() config.hydrate(node.inner.config.clone()).await?; let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true)); - let (chain_orchestrator, handle, l1_notification_tx) = config + let (chain_orchestrator, handle, l1_notification_tx, _) = config .clone() .build( RollupNodeContext::new( @@ -655,7 +656,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() // Lets iterate over all blocks expected to be derived from the first batch commit. let consolidation_outcome = loop { let event = rnm_events.next().await; - println!("Received event: {:?}", event); + tracing::info!(target: "scroll::test", event = ?event, "Received event"); if let Some(ChainOrchestratorEvent::BatchConsolidated(consolidation_outcome)) = event { break consolidation_outcome; } @@ -724,7 +725,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() // Start the RNM again. let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true)); - let (chain_orchestrator, handle, l1_notification_tx) = config + let (chain_orchestrator, handle, l1_notification_tx, _) = config .clone() .build( RollupNodeContext::new( @@ -755,26 +756,58 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() // Request an event stream from the rollup node manager. let mut rnm_events = handle.get_event_listener().await?; - println!("im here"); + // Send the second batch again to test skipping of already known batch. + l1_notification_tx + .send(Arc::new(L1Notification::BatchCommit { + block_info: block_1_info, + data: batch_1_data.clone(), + })) + .await?; + loop { + select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => { + bail!("Timed out waiting for first consolidated block after RNM restart"); + } - // Send the second batch again to mimic the watcher behaviour. + evt = rnm_events.next() => { + tracing::info!(target: "scroll::test", event = ?evt, "Received event"); + + if evt == Some(ChainOrchestratorEvent::BatchCommitDuplicate(batch_1_data.index)) { + break; + } + } + } + } + + // Send L1 finalized block event again to trigger consolidation from the last known safe block. let block_1_info = BlockInfo { number: 18318215, hash: B256::random() }; l1_notification_tx.send(Arc::new(L1Notification::Finalized(block_1_info.number))).await?; + let l2_block; // Lets fetch the first consolidated block event - this should be the first block of the batch. - let l2_block = loop { - if let Some(ChainOrchestratorEvent::BlockConsolidated(consolidation_outcome)) = - rnm_events.next().await - { - break consolidation_outcome.block_info().clone(); + loop { + select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => { + bail!("Timed out waiting for first consolidated block after RNM restart"); + } + + evt = rnm_events.next() => { + if let Some(ChainOrchestratorEvent::BlockConsolidated(consolidation_outcome)) = evt { + l2_block = Some(consolidation_outcome.block_info().clone()); + break; + } + + tracing::info!(target: "scroll::test", event = ?evt, "Received event"); + } } - }; + } - // One issue #273 is completed, we will again have safe blocks != finalized blocks, and this - // should be changed to 1. Assert that the consolidated block is the first block that was not - // previously processed of the batch. + // Assert that the consolidated block is the first block that was not previously processed of + // the batch. The last consolidated block before shutdown was block 40, so the next should + // be block 41. Since we apply blocks from a partially processed batch this is expected as described in https://github.com/scroll-tech/rollup-node/issues/411. Once we implement full batch atomicity (for setting safe blocks from a batch) this should change to block 5. assert_eq!( - l2_block.block_info.number, 41, + l2_block.unwrap().block_info.number, + 41, "Consolidated block number does not match expected number" ); @@ -785,7 +818,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() if let Some(ChainOrchestratorEvent::BlockConsolidated(consolidation_outcome)) = rnm_events.next().await { - assert!(consolidation_outcome.block_info().block_info.number == i); + assert_eq!(consolidation_outcome.block_info().block_info.number, i); break; } } @@ -840,7 +873,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - config.hydrate(node.inner.config.clone()).await?; let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true)); - let (rnm, handle, l1_watcher_tx) = config + let (rnm, handle, l1_watcher_tx, _) = config .clone() .build( RollupNodeContext::new( @@ -913,7 +946,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - // Start the RNM again. let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true)); - let (rnm, handle, _) = config + let (rnm, handle, _, _) = config .clone() .build( RollupNodeContext::new( @@ -1779,6 +1812,129 @@ async fn signer_rotation() -> eyre::Result<()> { Ok(()) } +// Test that the chain orchestrator detects gaps in batch commits, triggers a reset command to the +// L1 watcher for self-healing and skips duplicate batch commits. +#[tokio::test] +async fn test_batch_commit_gap() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + let mut fixture = TestFixture::builder().sequencer().build().await?; + + // Node is unsynced initially -> does not derive batches (which is what we want) + + let batch_1_hash = B256::random(); + // Send batch commit 1 to populate the database + fixture.l1().commit_batch().hash(batch_1_hash).index(1).block_number(1).send().await?; + fixture.expect_event().batch_commit_indexed(1, 1).await?; + + // Send duplicate batch commit 1 - should be skipped and duplicate detected + fixture.l1().commit_batch().hash(batch_1_hash).index(1).block_number(1).send().await?; + fixture.expect_event().batch_commit_duplicates(1).await?; + + // Send batch commit 3 - should trigger reset due to gap (missing batch 2) + fixture.l1().commit_batch().index(3).block_number(3).send().await?; + // Expect gap event: missing index 3, reset to L1 block 1 (where batch 1 was committed) + fixture.expect_event().batch_commit_gap(3, 1).await?; + + // Process the reset command and update the notification sender for subsequent notifications + let reset_block = fixture.process_gap_recovery_command().await?; + assert_eq!(reset_block, 1, "Reset block should be the L1 block of the last known batch"); + + Ok(()) +} + +// Test that the chain orchestrator detects gaps in L1 messages, triggers a reset command to the +// L1 watcher for self-healing and skips duplicate L1 messages received. +#[tokio::test] +async fn test_l1_message_gap() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + let mut fixture = TestFixture::builder().sequencer().build().await?; + + // Node is unsynced initially -> does not derive batches (which is what we want) + + let to_address = Address::random(); + let sender = Address::random(); + // Send L1 message 0 to populate the database + fixture + .l1() + .add_message() + .queue_index(0) + .to(to_address) + .sender(sender) + .at_block(1) + .send() + .await?; + fixture.expect_event().l1_message_committed().await?; + + // Send duplicate L1 message 0 - should be skipped and duplicate detected + fixture + .l1() + .add_message() + .queue_index(0) + .to(to_address) + .sender(sender) + .at_block(1) + .send() + .await?; + fixture.expect_event().l1_message_duplicate(0).await?; + + // Send L1 message 2 - should trigger reset due to gap (missing L1 message 1) + fixture.l1().add_message().queue_index(2).at_block(3).send().await?; + // Expect gap event: missing index 2, reset to L1 block 1 (where message 0 was committed) + fixture.expect_event().l1_message_gap(2, 1).await?; + + // Process the reset command and update the notification sender for subsequent notifications + let reset_block = fixture.process_gap_recovery_command().await?; + assert_eq!(reset_block, 1, "Reset block should be the L1 block of the last known L1 message"); + + Ok(()) +} + +// Test that the chain orchestrator detects gaps in batch reverts (missed revert events), +// triggers a reset command to the L1 watcher for self-healing and skips duplicate batch reverts. +#[tokio::test] +async fn test_batch_revert_gap() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + let mut fixture = TestFixture::builder().sequencer().build().await?; + + // Node is unsynced initially -> does not derive batches (which is what we want) + + // Send batch commit 1 to populate the database + fixture.l1().commit_batch().index(1).block_number(1).send().await?; + fixture.expect_event().batch_commit_indexed(1, 1).await?; + + // Send batch commit 2 to populate the database + fixture.l1().commit_batch().index(2).block_number(2).send().await?; + fixture.expect_event().batch_commit_indexed(2, 2).await?; + + // Send batch commit 2_new - simulating a missed revert event + // This should trigger a BatchRevertGap because we're seeing batch 2 again at a different + // L1 block, which indicates we missed a revert event + fixture.l1().commit_batch().index(2).block_number(10).send().await?; + // Expect revert gap event: missing index 2, reset to L1 block 2 (where batch 2 was committed) + fixture.expect_event().batch_revert_gap(2, 2).await?; + + // Process the reset command and update the notification sender for subsequent notifications + let reset_block = fixture.process_gap_recovery_command().await?; + assert_eq!(reset_block, 2, "Reset block should be the L1 block of the last known batch"); + + // Send actual revert for batch commit 2 + fixture.l1().revert_batch_range().start(2).end(2).at_block_number(6).send().await?; + // can't assert event due to no safe block head being set + + // Send duplicate batch revert for batch commit 2 - should be skipped and duplicate detected + fixture.l1().revert_batch_range().start(2).end(2).at_block_number(6).send().await?; + fixture.expect_event().batch_revert_duplicate(2).await?; + + // Send batch commit 2_new again to continue normal processing + fixture.l1().commit_batch().index(2).block_number(10).send().await?; + fixture.expect_event().batch_commit_indexed(2, 10).await?; + + Ok(()) +} + /// Read the file provided at `path` as a [`Bytes`]. pub fn read_to_bytes>(path: P) -> eyre::Result { use std::str::FromStr; diff --git a/crates/primitives/src/batch.rs b/crates/primitives/src/batch.rs index 3b49dd8c..8fc5240b 100644 --- a/crates/primitives/src/batch.rs +++ b/crates/primitives/src/batch.rs @@ -63,6 +63,21 @@ impl From<&BatchCommitData> for BatchInfo { } } +impl Default for BatchCommitData { + fn default() -> Self { + Self { + hash: B256::ZERO, + index: 0, + block_number: 0, + block_timestamp: 0, + calldata: Arc::new(Bytes::new()), + blob_versioned_hash: None, + finalized_block_number: None, + reverted_block_number: None, + } + } +} + /// The status of a batch. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum BatchStatus { diff --git a/crates/watcher/src/handle/command.rs b/crates/watcher/src/handle/command.rs new file mode 100644 index 00000000..0aeac750 --- /dev/null +++ b/crates/watcher/src/handle/command.rs @@ -0,0 +1,17 @@ +use crate::L1Notification; +use std::sync::Arc; +use tokio::sync::mpsc; + +/// Commands that can be sent to the L1 Watcher. +#[derive(Debug, Clone)] +pub enum L1WatcherCommand { + /// Reset the watcher to a specific L1 block number. + /// + /// This is used for gap recovery when the chain orchestrator detects missing L1 events. + ResetToBlock { + /// The L1 block number to reset to (last known good state) + block: u64, + /// New sender to replace the current notification channel + new_sender: mpsc::Sender>, + }, +} diff --git a/crates/watcher/src/handle/mod.rs b/crates/watcher/src/handle/mod.rs new file mode 100644 index 00000000..070e012b --- /dev/null +++ b/crates/watcher/src/handle/mod.rs @@ -0,0 +1,58 @@ +//! Command handle for the L1 Watcher. + +mod command; + +pub use command::L1WatcherCommand; + +use crate::L1Notification; +use std::sync::Arc; +use tokio::sync::{mpsc, mpsc::UnboundedSender}; + +/// Handle to interact with the L1 Watcher. +#[derive(Debug)] +pub struct L1WatcherHandle { + to_watcher_tx: UnboundedSender, + l1_notification_rx: mpsc::Receiver>, +} + +impl L1WatcherHandle { + /// Create a new handle with the given command sender. + pub const fn new( + to_watcher_tx: UnboundedSender, + l1_notification_rx: mpsc::Receiver>, + ) -> Self { + Self { to_watcher_tx, l1_notification_rx } + } + + /// Get a mutable reference to the L1 notification receiver. + pub const fn l1_notification_receiver(&mut self) -> &mut mpsc::Receiver> { + &mut self.l1_notification_rx + } + + /// Send a command to the watcher without waiting for a response. + fn send_command(&self, command: L1WatcherCommand) { + if let Err(err) = self.to_watcher_tx.send(command) { + tracing::error!(target: "scroll::watcher", ?err, "Failed to send command to L1 watcher"); + } + } + + /// Triggers gap recovery by resetting the L1 watcher to a specific block with a fresh channel. + pub async fn trigger_gap_recovery(&mut self, reset_block: u64) { + // Create a fresh notification channel + // Use the same capacity as the original channel + let capacity = self.l1_notification_rx.max_capacity(); + let (new_tx, new_rx) = mpsc::channel(capacity); + + // Send reset command with the new sender and wait for confirmation + self.reset_to_block(reset_block, new_tx).await; + + // Replace the receiver with the fresh channel + // The old channel is automatically dropped, discarding all stale notifications + self.l1_notification_rx = new_rx; + } + + /// Reset the L1 Watcher to a specific block number with a fresh notification channel. + async fn reset_to_block(&self, block: u64, new_sender: mpsc::Sender>) { + self.send_command(L1WatcherCommand::ResetToBlock { block, new_sender }); + } +} diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index 8a09c3b1..40627bae 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -6,6 +6,9 @@ use cache::Cache; mod error; pub use error::{EthRequestError, FilterLogError, L1WatcherError}; +pub mod handle; +pub use handle::{L1WatcherCommand, L1WatcherHandle}; + mod metrics; pub use metrics::WatcherMetrics; @@ -36,7 +39,7 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::sync::mpsc; +use tokio::{select, sync::mpsc}; /// The maximum count of unfinalized blocks we can have in Ethereum. pub const MAX_UNFINALIZED_BLOCK_COUNT: usize = 96; @@ -91,6 +94,8 @@ pub struct L1Watcher { current_block_number: BlockNumber, /// The sender part of the channel for [`L1Notification`]. sender: mpsc::Sender>, + /// The receiver part of the channel for [`L1WatcherCommand`]. + command_rx: mpsc::UnboundedReceiver, /// The rollup node configuration. config: Arc, /// The metrics for the watcher. @@ -202,16 +207,18 @@ where EP: Provider + SystemContractProvider + 'static, { /// Spawn a new [`L1Watcher`], starting at `start_block`. The watcher will iterate the L1, - /// returning [`L1Notification`] in the returned channel. + /// returning [`L1Notification`] in the returned channel and a handle for sending commands. pub async fn spawn( execution_provider: EP, l1_block_startup_info: L1BlockStartupInfo, config: Arc, log_query_block_range: u64, - ) -> mpsc::Receiver> { + ) -> L1WatcherHandle { tracing::trace!(target: "scroll::watcher", ?l1_block_startup_info, ?config, "spawning L1 watcher"); let (tx, rx) = mpsc::channel(log_query_block_range as usize); + let (command_tx, command_rx) = mpsc::unbounded_channel(); + let handle = L1WatcherHandle::new(command_tx, rx); let fetch_block_info = async |tag: BlockNumberOrTag| { let block = loop { @@ -260,13 +267,14 @@ where }; // init the watcher. - let watcher = Self { + let mut watcher = Self { execution_provider, unfinalized_blocks: BoundedVec::new(HEADER_CAPACITY), current_block_number: start_block.saturating_sub(1), l1_state, cache: Cache::new(TRANSACTION_CACHE_CAPACITY), sender: tx, + command_rx, config, metrics: WatcherMetrics::default(), is_synced: false, @@ -289,14 +297,39 @@ where .await .expect("channel is open in this context"); - tokio::spawn(watcher.run()); + tokio::spawn(async move { watcher.run().await }); - rx + handle } /// Main execution loop for the [`L1Watcher`]. - pub async fn run(mut self) { + pub async fn run(&mut self) { loop { + // Determine sleep duration based on sync state + let sleep_duration = if self.is_synced { SLOW_SYNC_INTERVAL } else { Duration::ZERO }; + + // Select between receiving commands and sleeping + select! { + result = self.command_rx.recv() => { + match result { + Some(command) => { + if let Err(err) = self.handle_command(command).await { + tracing::error!(target: "scroll::watcher", ?err, "error handling command"); + } + // Continue to process commands without stepping, in case there are more + continue; + } + None => { + tracing::warn!(target: "scroll::watcher", "command channel closed, stopping the watcher"); + break; + } + } + } + _ = tokio::time::sleep(sleep_duration) => { + // Sleep completed, proceed to step + } + } + // step the watcher. if let Err(L1WatcherError::SendError(_)) = self .step() @@ -307,10 +340,8 @@ where break; } - // sleep if we are synced. - if self.is_synced { - tokio::time::sleep(SLOW_SYNC_INTERVAL).await; - } else if self.current_block_number == self.l1_state.head { + // Check if we just synced to the head + if !self.is_synced && self.current_block_number == self.l1_state.head { // if we have synced to the head of the L1, notify the channel and set the // `is_synced`` flag. if let Err(L1WatcherError::SendError(_)) = self.notify(L1Notification::Synced).await @@ -323,6 +354,36 @@ where } } + /// Handle a command sent via the handle. + async fn handle_command(&mut self, command: L1WatcherCommand) -> L1WatcherResult<()> { + match command { + L1WatcherCommand::ResetToBlock { block, new_sender } => { + self.handle_reset(block, new_sender).await?; + } + } + Ok(()) + } + + /// Reset the watcher to a specific block number with a fresh notification channel. + async fn handle_reset( + &mut self, + block: u64, + new_sender: mpsc::Sender>, + ) -> L1WatcherResult<()> { + tracing::warn!(target: "scroll::watcher", "resetting L1 watcher to block {}", block); + + // Reset state + self.current_block_number = block; + self.unfinalized_blocks.clear(); + self.is_synced = false; + + // Replace the sender with the fresh channel + // This discards the old channel and any stale notifications in it + self.sender = new_sender; + + Ok(()) + } + /// A step of work for the [`L1Watcher`]. pub async fn step(&mut self) -> L1WatcherResult<()> { // handle the finalized block. @@ -765,7 +826,7 @@ where } /// Send all notifications on the channel. - async fn notify_all(&self, notifications: Vec) -> L1WatcherResult<()> { + async fn notify_all(&mut self, notifications: Vec) -> L1WatcherResult<()> { for notification in notifications { self.metrics.process_l1_notification(¬ification); tracing::trace!(target: "scroll::watcher", %notification, "sending l1 notification"); @@ -775,10 +836,30 @@ where } /// Send the notification in the channel. - async fn notify(&self, notification: L1Notification) -> L1WatcherResult<()> { - Ok(self.sender.send(Arc::new(notification)).await.inspect_err( - |err| tracing::error!(target: "scroll::watcher", ?err, "failed to send notification"), - )?) + async fn notify(&mut self, notification: L1Notification) -> L1WatcherResult<()> { + select! { + biased; + + Some(command) = self.command_rx.recv() => { + // If a command is received while trying to send a notification, + // we prioritize handling the command first. + // This prevents potential deadlocks if the channel is full. + tracing::trace!(target: "scroll::watcher", "command received while sending notification, prioritizing command handling"); + + if let Err(err) = self.handle_command(command).await { + tracing::error!(target: "scroll::watcher", ?err, "error handling command"); + } + + return Ok(()); + } + result = self.sender.send(Arc::new(notification)) => { + result.inspect_err( + |err| tracing::error!(target: "scroll::watcher", ?err, "failed to send notification"), + )?; + } + } + + Ok(()) } /// Updates the current block number, saturating at the head of the chain. @@ -864,7 +945,7 @@ mod tests { transactions: Vec, finalized: Header, latest: Header, - ) -> (L1Watcher, mpsc::Receiver>) { + ) -> (L1Watcher, L1WatcherHandle) { let provider_blocks = provider_blocks.into_iter().map(|h| Block { header: h, ..Default::default() }); let finalized = Block { header: finalized, ..Default::default() }; @@ -878,6 +959,9 @@ mod tests { ); let (tx, rx) = mpsc::channel(LOG_QUERY_BLOCK_RANGE as usize); + let (command_tx, command_rx) = mpsc::unbounded_channel(); + let handle = L1WatcherHandle::new(command_tx, rx); + ( L1Watcher { execution_provider: provider, @@ -886,12 +970,13 @@ mod tests { cache: Cache::new(TRANSACTION_CACHE_CAPACITY), current_block_number: 0, sender: tx, + command_rx, config: Arc::new(NodeConfig::mainnet()), metrics: WatcherMetrics::default(), is_synced: false, log_query_block_range: LOG_QUERY_BLOCK_RANGE, }, - rx, + handle, ) } @@ -901,7 +986,7 @@ mod tests { let (finalized, latest, chain) = chain(21); let unfinalized_blocks = chain[1..11].to_vec(); - let (watcher, _) = l1_watcher( + let (watcher, _handle) = l1_watcher( unfinalized_blocks, chain.clone(), vec![], @@ -926,7 +1011,7 @@ mod tests { let mut provider_blocks = chain_from(&chain[10], 10); let latest = provider_blocks[9].clone(); - let (watcher, _) = l1_watcher( + let (watcher, _handle) = l1_watcher( unfinalized_blocks, provider_blocks.clone(), vec![], @@ -949,7 +1034,7 @@ mod tests { async fn test_should_handle_finalized_with_empty_state() -> eyre::Result<()> { // Given let (finalized, latest, _) = chain(2); - let (mut watcher, _rx) = l1_watcher(vec![], vec![], vec![], finalized.clone(), latest); + let (mut watcher, _handle) = l1_watcher(vec![], vec![], vec![], finalized.clone(), latest); // When watcher.handle_finalized_block(&finalized).await?; @@ -965,7 +1050,7 @@ mod tests { // Given let (_, latest, chain) = chain(10); let finalized = chain[5].clone(); - let (mut watcher, _rx) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest); + let (mut watcher, _handle) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest); // When watcher.handle_finalized_block(&finalized).await?; @@ -981,7 +1066,7 @@ mod tests { // Given let (_, latest, chain) = chain(10); let finalized = latest.clone(); - let (mut watcher, _rx) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest); + let (mut watcher, _handle) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest); // When watcher.handle_finalized_block(&finalized).await?; @@ -996,7 +1081,8 @@ mod tests { async fn test_should_match_unfinalized_tail() -> eyre::Result<()> { // Given let (finalized, latest, chain) = chain(10); - let (mut watcher, _) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest.clone()); + let (mut watcher, _handle) = + l1_watcher(chain, vec![], vec![], finalized.clone(), latest.clone()); // When watcher.handle_latest_block(&finalized, &latest).await?; @@ -1013,7 +1099,7 @@ mod tests { // Given let (finalized, latest, chain) = chain(10); let unfinalized_chain = chain[..9].to_vec(); - let (mut watcher, _rx) = + let (mut watcher, _handle) = l1_watcher(unfinalized_chain, vec![], vec![], finalized.clone(), latest.clone()); assert_eq!(watcher.unfinalized_blocks.len(), 9); @@ -1033,7 +1119,7 @@ mod tests { // Given let (finalized, latest, chain) = chain(10); let unfinalized_chain = chain[..5].to_vec(); - let (mut watcher, mut receiver) = + let (mut watcher, mut handle) = l1_watcher(unfinalized_chain, chain, vec![], finalized.clone(), latest.clone()); // When @@ -1042,7 +1128,7 @@ mod tests { // Then assert_eq!(watcher.unfinalized_blocks.len(), 10); assert_eq!(watcher.unfinalized_blocks.pop().unwrap(), latest); - let notification = receiver.recv().await.unwrap(); + let notification = handle.l1_notification_receiver().recv().await.unwrap(); assert!(matches!(*notification, L1Notification::NewBlock(_))); Ok(()) @@ -1054,7 +1140,7 @@ mod tests { let (finalized, _, chain) = chain(10); let reorged = chain_from(&chain[5], 10); let latest = reorged[9].clone(); - let (mut watcher, mut receiver) = + let (mut watcher, mut handle) = l1_watcher(chain.clone(), reorged, vec![], finalized.clone(), latest.clone()); // When @@ -1065,9 +1151,9 @@ mod tests { assert_eq!(watcher.unfinalized_blocks.pop().unwrap(), latest); assert_eq!(watcher.current_block_number, chain[5].number); - let notification = receiver.recv().await.unwrap(); + let notification = handle.l1_notification_receiver().recv().await.unwrap(); assert!(matches!(*notification, L1Notification::Reorg(_))); - let notification = receiver.recv().await.unwrap(); + let notification = handle.l1_notification_receiver().recv().await.unwrap(); assert!(matches!(*notification, L1Notification::NewBlock(_))); Ok(()) @@ -1077,7 +1163,8 @@ mod tests { async fn test_should_handle_l1_messages() -> eyre::Result<()> { // Given let (finalized, latest, chain) = chain(10); - let (watcher, _) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest.clone()); + let (watcher, _handle) = + l1_watcher(chain, vec![], vec![], finalized.clone(), latest.clone()); // build test logs. let mut logs = (0..10).map(|_| random!(Log)).collect::>(); @@ -1116,7 +1203,7 @@ mod tests { effective_gas_price: None, }; - let (mut watcher, _) = + let (mut watcher, _handle) = l1_watcher(chain, vec![], vec![tx.clone()], finalized.clone(), latest.clone()); // build test logs. @@ -1202,7 +1289,8 @@ mod tests { async fn test_should_handle_finalize_commits() -> eyre::Result<()> { // Given let (finalized, latest, chain) = chain(10); - let (watcher, _) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest.clone()); + let (watcher, _handle) = + l1_watcher(chain, vec![], vec![], finalized.clone(), latest.clone()); // build test logs. let mut logs = (0..10).map(|_| random!(Log)).collect::>(); @@ -1224,4 +1312,77 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_handle_trigger_gap_recovery() -> eyre::Result<()> { + // Given: A watcher with state + let (finalized, latest, chain) = chain(10); + let unfinalized_blocks = chain[1..5].to_vec(); + let (mut watcher, mut handle) = + l1_watcher(unfinalized_blocks.clone(), chain, vec![], finalized, latest); + + watcher.current_block_number = unfinalized_blocks.last().unwrap().number; + watcher.is_synced = true; + assert_eq!(watcher.unfinalized_blocks.len(), 4); + + let join = tokio::spawn(async move { + // When: Reset to block 2 + handle.trigger_gap_recovery(2).await; + + // close channel to end watcher run loop + drop(handle); + }); + + watcher.run().await; + + join.await?; + + // Then: State should be reset + assert_eq!(watcher.current_block_number, 2); + assert_eq!(watcher.unfinalized_blocks.len(), 0, "unfinalized blocks should be cleared"); + assert!(!watcher.is_synced, "is_synced should be reset to false"); + + Ok(()) + } + + #[tokio::test] + async fn test_handle_deadlock_prevention() -> eyre::Result<()> { + let (finalized, latest, chain) = chain(10); + let unfinalized_blocks = chain[1..5].to_vec(); + let (mut watcher, mut handle) = + l1_watcher(unfinalized_blocks.clone(), chain, vec![], finalized, latest); + + // When: Fill the channel to capacity LOG_QUERY_BLOCK_RANGE + for i in 0..LOG_QUERY_BLOCK_RANGE { + watcher + .notify(L1Notification::NewBlock(BlockInfo { number: i, hash: random!(B256) })) + .await?; + } + + assert_eq!(watcher.current_block_number, 0, "Watcher should be set to block"); + + // Channel is now full. Spawn a task that will try to send another notification + // This blocks until we send the command to reset. + let watcher_handle_task = tokio::spawn(async move { + // This would normally block, but the reset command should interrupt it + let result = watcher + .notify(L1Notification::NewBlock(BlockInfo { number: 1000, hash: random!(B256) })) + .await; + // After reset is handled, the notify returns without sending + (watcher, result) + }); + + // Give the notify a chance to start blocking + tokio::time::sleep(Duration::from_millis(50)).await; + + // Then: Send reset command - this should NOT deadlock + tokio::time::timeout(Duration::from_secs(1), handle.trigger_gap_recovery(100)).await?; + + // Verify the watcher processed the reset + let (watcher, notify_result) = watcher_handle_task.await?; + assert!(notify_result.is_ok(), "Notify should complete after handling reset"); + assert_eq!(watcher.current_block_number, 100, "Watcher should be reset to block"); + + Ok(()) + } } diff --git a/crates/watcher/tests/indexing.rs b/crates/watcher/tests/indexing.rs index c1fdc040..eae7a2ff 100644 --- a/crates/watcher/tests/indexing.rs +++ b/crates/watcher/tests/indexing.rs @@ -59,7 +59,7 @@ async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()> ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = L1Watcher::spawn( + let mut handle = L1Watcher::spawn( mock_provider, L1BlockStartupInfo::None, Arc::new(config), @@ -72,7 +72,7 @@ async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()> loop { select! { - notification = l1_watcher.recv() => { + notification = handle.l1_notification_receiver().recv() => { let notification = notification.map(|notif| (*notif).clone()); if let Some(L1Notification::L1Message { block_info, .. }) = notification { assert_ne!(prev_block_info, block_info, "indexed same block twice {block_info}"); diff --git a/crates/watcher/tests/logs.rs b/crates/watcher/tests/logs.rs index 0e689e4d..25f22e66 100644 --- a/crates/watcher/tests/logs.rs +++ b/crates/watcher/tests/logs.rs @@ -13,6 +13,7 @@ use rollup_node_watcher::{ use scroll_alloy_consensus::TxL1Message; use scroll_l1::abi::logs::{try_decode_log, QueueTransaction}; use std::sync::Arc; +use tokio::select; #[tokio::test] async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> { @@ -64,7 +65,7 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> { ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = L1Watcher::spawn( + let mut handle = L1Watcher::spawn( mock_provider, L1BlockStartupInfo::None, Arc::new(config), @@ -72,12 +73,23 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> { ) .await; let mut received_logs = Vec::new(); + + // make sure we time out if we don't receive the expected logs loop { - let notification = l1_watcher.recv().await.map(|notif| (*notif).clone()); - if let Some(L1Notification::L1Message { block_timestamp, message, .. }) = notification { - received_logs.push(message); - if block_timestamp == last_log.block_timestamp.unwrap() { - break + select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => { + eyre::bail!("Timed out waiting for logs"); + } + notif = handle.l1_notification_receiver().recv() => { + let notification = notif.map(|notif| (*notif).clone()); + if let Some(L1Notification::L1Message { block_timestamp, message, .. }) = notification { + received_logs.push(message); + if block_timestamp == last_log.block_timestamp.unwrap() { + break + } + } else if notification.is_none() { + break // channel closed + } } } } diff --git a/crates/watcher/tests/reorg.rs b/crates/watcher/tests/reorg.rs index 6c81b9c8..3b76b469 100644 --- a/crates/watcher/tests/reorg.rs +++ b/crates/watcher/tests/reorg.rs @@ -72,7 +72,7 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = L1Watcher::spawn( + let mut handle = L1Watcher::spawn( mock_provider, L1BlockStartupInfo::None, Arc::new(config), @@ -81,8 +81,8 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { .await; // skip the first two events - l1_watcher.recv().await.unwrap(); - l1_watcher.recv().await.unwrap(); + handle.l1_notification_receiver().recv().await.unwrap(); + handle.l1_notification_receiver().recv().await.unwrap(); let mut latest_number = latest_blocks.first().unwrap().header.number; let mut finalized_number = finalized_blocks.first().unwrap().header.number; @@ -90,10 +90,10 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { for (latest, finalized) in latest_blocks[1..].iter().zip(finalized_blocks[1..].iter()) { // check finalized first. if finalized_number < finalized.header.number { - let mut notification = l1_watcher.recv().await.unwrap(); + let mut notification = handle.l1_notification_receiver().recv().await.unwrap(); // skip the `L1Notification::Processed` notifications if matches!(notification.as_ref(), L1Notification::Processed(_)) { - notification = l1_watcher.recv().await.unwrap(); + notification = handle.l1_notification_receiver().recv().await.unwrap(); } assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number)); } @@ -102,23 +102,23 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { continue; } - let mut notification = l1_watcher.recv().await.unwrap(); + let mut notification = handle.l1_notification_receiver().recv().await.unwrap(); // skip the `L1Notification::Processed` notifications if matches!(notification.as_ref(), L1Notification::Processed(_)) { - notification = l1_watcher.recv().await.unwrap(); + notification = handle.l1_notification_receiver().recv().await.unwrap(); } // skip the `L1Notification::Synced` notifications if matches!(notification.as_ref(), L1Notification::Synced) { - notification = l1_watcher.recv().await.unwrap(); + notification = handle.l1_notification_receiver().recv().await.unwrap(); } // check latest for reorg or new block. if latest_number > latest.header.number { // reorg assert!(matches!(notification.as_ref(), L1Notification::Reorg(_))); - let notification = l1_watcher.recv().await.unwrap(); + let notification = handle.l1_notification_receiver().recv().await.unwrap(); assert_eq!(notification.as_ref(), &L1Notification::NewBlock((&latest.header).into())); } else { assert_eq!(notification.as_ref(), &L1Notification::NewBlock((&latest.header).into())); @@ -179,7 +179,7 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = L1Watcher::spawn( + let mut handle = L1Watcher::spawn( mock_provider, L1BlockStartupInfo::None, Arc::new(config), @@ -188,8 +188,8 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { .await; // skip the first two events - l1_watcher.recv().await.unwrap(); - l1_watcher.recv().await.unwrap(); + handle.l1_notification_receiver().recv().await.unwrap(); + handle.l1_notification_receiver().recv().await.unwrap(); let mut latest_number = latest_blocks.first().unwrap().header.number; let mut finalized_number = finalized_blocks.first().unwrap().header.number; @@ -197,10 +197,10 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { for (latest, finalized) in latest_blocks[1..].iter().zip(finalized_blocks[1..].iter()) { // check finalized first. if finalized_number < finalized.header.number { - let mut notification = l1_watcher.recv().await.unwrap(); + let mut notification = handle.l1_notification_receiver().recv().await.unwrap(); // skip the `L1Notification::Processed` notifications if matches!(notification.as_ref(), L1Notification::Processed(_)) { - notification = l1_watcher.recv().await.unwrap(); + notification = handle.l1_notification_receiver().recv().await.unwrap(); } assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number)); } @@ -209,16 +209,16 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { continue; } - let mut notification = l1_watcher.recv().await.unwrap(); + let mut notification = handle.l1_notification_receiver().recv().await.unwrap(); // skip the `L1Notification::Processed` notifications if matches!(notification.as_ref(), L1Notification::Processed(_)) { - notification = l1_watcher.recv().await.unwrap(); + notification = handle.l1_notification_receiver().recv().await.unwrap(); } // skip the `L1Notification::Synced` notifications if matches!(notification.as_ref(), L1Notification::Synced) { - notification = l1_watcher.recv().await.unwrap(); + notification = handle.l1_notification_receiver().recv().await.unwrap(); } assert_eq!(notification.as_ref(), &L1Notification::NewBlock((&latest.header).into()));