From f8fa0b1a6dac1b6d171013641f1b44c00132a3b6 Mon Sep 17 00:00:00 2001 From: Shahak Shama Date: Tue, 16 Jun 2026 14:18:25 +0300 Subject: [PATCH] apollo_l1_events: reuse running sync task on catch-up restart by raising its shared target height MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Goal: Fix audit finding H-14 — unbounded task leak in the L1EventsProvider catch-up flow. Change summary: - On catch-up restart, reuse the in-flight l2_sync_task instead of recreating the Catchupper and spawning a second one. target_height is now an Arc shared with the running task; start_catching_up raises it via update_target_height when a task is already running, and the task re-reads it each iteration so it keeps syncing up to the new target. - Dropping a tokio JoinHandle does not cancel the task, so before this every catch-up restart leaked a still-running sync task that races to commit the same blocks and can spin forever in its retry loop (DoS). - Add regression test restarting_catch_up_raises_target_without_spawning_second_sync_task. Decision points: - Keep the running task and raise its shared target rather than abort + respawn: aborting a task parked on a client call is not cancel-safe (the commit may apply while its result is never observed). A single long-lived task that tracks the latest target avoids both the leak and the cancel-safety hazard. - Atomic orderings: Release on the writer (update_target_height), Acquire on the readers, since there is a single writer (provider) and single reader (sync task). Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/apollo_l1_events/src/catchupper.rs | 42 ++++++++++---- .../src/l1_events_provider.rs | 12 +++- .../src/l1_events_provider_tests.rs | 56 +++++++++++++++++++ crates/apollo_l1_events/src/test_utils.rs | 2 +- 4 files changed, 98 insertions(+), 14 deletions(-) diff --git a/crates/apollo_l1_events/src/catchupper.rs b/crates/apollo_l1_events/src/catchupper.rs index d34f9d6a161..d41f0c9aa35 100644 --- a/crates/apollo_l1_events/src/catchupper.rs +++ b/crates/apollo_l1_events/src/catchupper.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicU8, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -20,7 +20,9 @@ use tracing::{debug, warn}; /// Caches commits to be applied later. This flow is only relevant while the node is starting up. #[derive(Clone)] pub struct Catchupper { - pub target_height: BlockNumber, + // Shared with the running sync task (rather than passed by value) so the target can be raised + // while the task is in flight; see `update_target_height`. + pub target_height: Arc, pub sync_retry_interval: Duration, pub commit_block_backlog: Vec, pub l1_events_provider_client: SharedL1EventsProviderClient, @@ -49,13 +51,13 @@ impl Catchupper { n_sync_health_check_failures: Default::default(), // This is overriden when starting the sync task (e.g., when provider starts // catching up). - target_height: BlockNumber(0), + target_height: Default::default(), } } /// Check if the caller has caught up with the catchupper. pub fn is_caught_up(&self, current_provider_height: BlockNumber) -> bool { - let is_caught_up = current_provider_height > self.target_height; + let is_caught_up = current_provider_height > self.target_height(); self.sync_task_health_check(is_caught_up); @@ -86,7 +88,8 @@ impl Catchupper { current_provider_height: BlockNumber, target_height: BlockNumber, ) { - self.target_height = target_height; + // Fresh shared target for this task; cloned into the task below so it shares the same cell. + self.target_height = Arc::new(AtomicU64::new(target_height.0)); // FIXME: spawning a task like this is evil. // However, we aren't using the task executor, so no choice :( // Once we start using a centralized threadpool, spawn through it instead of the @@ -95,7 +98,7 @@ impl Catchupper { self.l1_events_provider_client.clone(), self.sync_client.clone(), current_provider_height, - target_height, + self.target_height.clone(), self.sync_retry_interval, )); @@ -103,7 +106,18 @@ impl Catchupper { } pub fn target_height(&self) -> BlockNumber { - self.target_height + BlockNumber(self.target_height.load(Ordering::Acquire)) + } + + /// Raises the target height of the running sync task so it keeps syncing up to `target_height`. + /// Uses `fetch_max` so the target only moves forward; a lower height is ignored. + pub fn update_target_height(&self, target_height: BlockNumber) { + self.target_height.fetch_max(target_height.0, Ordering::Release); + } + + /// Returns true while an L2 sync task is in flight (spawned and not yet finished). + pub fn is_sync_task_running(&self) -> bool { + matches!(&self.sync_task_handle, SyncTaskHandle::Started(sync_task) if !sync_task.is_finished()) } fn sync_task_health_check(&self, is_caught_up: bool) { @@ -129,7 +143,7 @@ impl Catchupper { impl PartialEq for Catchupper { fn eq(&self, other: &Self) -> bool { - self.target_height == other.target_height + self.target_height() == other.target_height() && self.commit_block_backlog == other.commit_block_backlog } } @@ -139,7 +153,7 @@ impl Eq for Catchupper {} impl std::fmt::Debug for Catchupper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Catchupper") - .field("target_height", &self.target_height) + .field("target_height", &self.target_height()) .field("commit_block_backlog", &self.commit_block_backlog) .field("sync_task_handle", &self.sync_task_handle) .finish_non_exhaustive() @@ -150,14 +164,18 @@ async fn l2_sync_task( l1_events_provider_client: SharedL1EventsProviderClient, sync_client: SharedStateSyncClient, mut current_height: BlockNumber, - target_height: BlockNumber, + target_height: Arc, retry_interval: Duration, ) { - while current_height <= target_height { + // The target is re-read every iteration so an `update_target_height` call from the provider + // (a higher block committed before catch-up finishes) extends this same task instead of + // spawning a competing one. + while current_height.0 <= target_height.load(Ordering::Acquire) { // TODO(Gilad): add tracing instrument. debug!( "Syncing L1EventsProvider with L2 height: {} to target height: {}", - current_height, target_height + current_height, + target_height.load(Ordering::Acquire) ); let block = sync_client.get_block(current_height).await.inspect_err(|err| debug!("{err}")); diff --git a/crates/apollo_l1_events/src/l1_events_provider.rs b/crates/apollo_l1_events/src/l1_events_provider.rs index 0c9de482a51..27b7eaf9033 100644 --- a/crates/apollo_l1_events/src/l1_events_provider.rs +++ b/crates/apollo_l1_events/src/l1_events_provider.rs @@ -351,8 +351,18 @@ impl L1EventsProvider { /// Go from current state to CatchingUp state and start the L2 sync. pub fn start_catching_up(&mut self, target_height: BlockNumber) { - self.reset_catchupper(); self.state = ProviderState::CatchingUp; + // If a sync task is already bootstrapping the provider, raise its target instead of + // spawning a second one. + // The running task re-reads the shared target and keeps syncing up to it. + // + // We can't abort the running task safely (a client call is not cancel-safe) + // Recreating the catchupper here and detaching the current task would leak racing tasks. + if self.catchupper.is_sync_task_running() { + self.catchupper.update_target_height(target_height); + return; + } + self.reset_catchupper(); self.catchupper.start_l2_sync(self.current_height, target_height); } diff --git a/crates/apollo_l1_events/src/l1_events_provider_tests.rs b/crates/apollo_l1_events/src/l1_events_provider_tests.rs index 8dc6d1f6e07..c04c2914e94 100644 --- a/crates/apollo_l1_events/src/l1_events_provider_tests.rs +++ b/crates/apollo_l1_events/src/l1_events_provider_tests.rs @@ -1860,6 +1860,62 @@ async fn test_stuck_sync() { } } +#[tokio::test] +async fn restarting_catch_up_raises_target_without_spawning_second_sync_task() { + const STARTUP_HEIGHT: BlockNumber = BlockNumber(1); + const TARGET_HEIGHT: BlockNumber = BlockNumber(10); + const HIGHER_TARGET_HEIGHT: BlockNumber = BlockNumber(20); + + // Always fail to fetch blocks, so the sync task never finishes on its own: it loops forever, + // sleeping between retries. This keeps a task in flight across the restart below. + let mut sync_client = MockStateSyncClient::default(); + sync_client + .expect_get_block() + .returning(|height| Err(StateSyncError::BlockNotFound(height).into())); + let config = L1EventsProviderConfig { + startup_sync_sleep_retry_interval_seconds: Duration::from_millis(10), + ..Default::default() + }; + let mut l1_events_provider = L1EventsProvider::new( + config, + Arc::new(FakeL1EventsProviderClient::default()), + Arc::new(sync_client), + None, + ); + + // Start catching up and capture a handle to the first (forever-running) sync task. + l1_events_provider.initialize(STARTUP_HEIGHT, Default::default()).await.unwrap(); + l1_events_provider.start_catching_up(TARGET_HEIGHT); + let SyncTaskHandle::Started(first_sync_task) = + l1_events_provider.catchupper.sync_task_handle.clone() + else { + panic!("First sync task should have started."); + }; + assert!(!first_sync_task.is_finished(), "First sync task should be forever running."); + assert_eq!(l1_events_provider.catchupper.target_height(), TARGET_HEIGHT); + + // Restart catching up with a higher target while the first task is still bootstrapping. This + // must NOT spawn a second task (a leaked task would race to commit the same blocks and spin in + // a retry loop forever): the existing task is kept and its shared target is raised instead. + l1_events_provider.start_catching_up(HIGHER_TARGET_HEIGHT); + + let SyncTaskHandle::Started(sync_task_after_restart) = + l1_events_provider.catchupper.sync_task_handle.clone() + else { + panic!("Sync task handle should still be present after restart."); + }; + assert!( + Arc::ptr_eq(&first_sync_task, &sync_task_after_restart), + "Restarting catch-up spawned a second sync task instead of reusing the running one." + ); + assert!(!first_sync_task.is_finished(), "The original sync task should still be running."); + assert_eq!( + l1_events_provider.catchupper.target_height(), + HIGHER_TARGET_HEIGHT, + "Restarting catch-up should have raised the running task's target height." + ); +} + #[tokio::test] async fn provider_initialized_in_pending_is_same_as_uninitialized_after_getting_initialize() { // Setup. diff --git a/crates/apollo_l1_events/src/test_utils.rs b/crates/apollo_l1_events/src/test_utils.rs index e03704610f7..84e36465b18 100644 --- a/crates/apollo_l1_events/src/test_utils.rs +++ b/crates/apollo_l1_events/src/test_utils.rs @@ -44,7 +44,7 @@ macro_rules! make_catchupper { committed_txs: [$(tx_hash!($tx)),*].into() }),* ].into_iter().collect(), - target_height: BlockNumber(0), + target_height: Default::default(), l1_events_provider_client: Arc::new(FakeL1EventsProviderClient::default()), sync_client: Arc::new(MockStateSyncClient::default()), sync_task_handle: SyncTaskHandle::default(),