Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions crates/apollo_l1_events/src/catchupper.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -20,7 +20,9 @@ use tracing::{debug, warn};
/// Caches commits to be applied later. This flow is only relevant while the node is starting up.
#[derive(Clone)]
pub struct Catchupper {
pub target_height: BlockNumber,
// Shared with the running sync task (rather than passed by value) so the target can be raised
// while the task is in flight; see `update_target_height`.
pub target_height: Arc<AtomicU64>,
pub sync_retry_interval: Duration,
pub commit_block_backlog: Vec<CommitBlockBacklog>,
pub l1_events_provider_client: SharedL1EventsProviderClient,
Expand Down Expand Up @@ -49,13 +51,13 @@ impl Catchupper {
n_sync_health_check_failures: Default::default(),
// This is overriden when starting the sync task (e.g., when provider starts
// catching up).
target_height: BlockNumber(0),
target_height: Default::default(),
}
}

/// Check if the caller has caught up with the catchupper.
pub fn is_caught_up(&self, current_provider_height: BlockNumber) -> bool {
let is_caught_up = current_provider_height > self.target_height;
let is_caught_up = current_provider_height > self.target_height();

self.sync_task_health_check(is_caught_up);

Expand Down Expand Up @@ -86,7 +88,8 @@ impl Catchupper {
current_provider_height: BlockNumber,
target_height: BlockNumber,
) {
self.target_height = target_height;
// Fresh shared target for this task; cloned into the task below so it shares the same cell.
self.target_height = Arc::new(AtomicU64::new(target_height.0));
// FIXME: spawning a task like this is evil.
// However, we aren't using the task executor, so no choice :(
// Once we start using a centralized threadpool, spawn through it instead of the
Expand All @@ -95,15 +98,26 @@ impl Catchupper {
self.l1_events_provider_client.clone(),
self.sync_client.clone(),
current_provider_height,
target_height,
self.target_height.clone(),
self.sync_retry_interval,
));

self.sync_task_handle = SyncTaskHandle::Started(sync_task_handle.into());
}

pub fn target_height(&self) -> BlockNumber {
self.target_height
BlockNumber(self.target_height.load(Ordering::Acquire))
}

/// Raises the target height of the running sync task so it keeps syncing up to `target_height`.
/// Uses `fetch_max` so the target only moves forward; a lower height is ignored.
pub fn update_target_height(&self, target_height: BlockNumber) {
self.target_height.fetch_max(target_height.0, Ordering::Release);
Comment thread
asaf-sw marked this conversation as resolved.
}

/// Returns true while an L2 sync task is in flight (spawned and not yet finished).
pub fn is_sync_task_running(&self) -> bool {
matches!(&self.sync_task_handle, SyncTaskHandle::Started(sync_task) if !sync_task.is_finished())
}

fn sync_task_health_check(&self, is_caught_up: bool) {
Expand All @@ -129,7 +143,7 @@ impl Catchupper {

impl PartialEq for Catchupper {
fn eq(&self, other: &Self) -> bool {
self.target_height == other.target_height
self.target_height() == other.target_height()
&& self.commit_block_backlog == other.commit_block_backlog
}
}
Expand All @@ -139,7 +153,7 @@ impl Eq for Catchupper {}
impl std::fmt::Debug for Catchupper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Catchupper")
.field("target_height", &self.target_height)
.field("target_height", &self.target_height())
.field("commit_block_backlog", &self.commit_block_backlog)
.field("sync_task_handle", &self.sync_task_handle)
.finish_non_exhaustive()
Expand All @@ -150,14 +164,18 @@ async fn l2_sync_task(
l1_events_provider_client: SharedL1EventsProviderClient,
sync_client: SharedStateSyncClient,
mut current_height: BlockNumber,
target_height: BlockNumber,
target_height: Arc<AtomicU64>,
retry_interval: Duration,
) {
while current_height <= target_height {
// The target is re-read every iteration so an `update_target_height` call from the provider
// (a higher block committed before catch-up finishes) extends this same task instead of
// spawning a competing one.
while current_height.0 <= target_height.load(Ordering::Acquire) {
// TODO(Gilad): add tracing instrument.
debug!(
"Syncing L1EventsProvider with L2 height: {} to target height: {}",
current_height, target_height
current_height,
target_height.load(Ordering::Acquire)
);
let block = sync_client.get_block(current_height).await.inspect_err(|err| debug!("{err}"));

Expand Down
12 changes: 11 additions & 1 deletion crates/apollo_l1_events/src/l1_events_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,18 @@ impl L1EventsProvider {

/// Go from current state to CatchingUp state and start the L2 sync.
pub fn start_catching_up(&mut self, target_height: BlockNumber) {
self.reset_catchupper();
self.state = ProviderState::CatchingUp;
// If a sync task is already bootstrapping the provider, raise its target instead of
// spawning a second one.
// The running task re-reads the shared target and keeps syncing up to it.
//
// We can't abort the running task safely (a client call is not cancel-safe)
// Recreating the catchupper here and detaching the current task would leak racing tasks.
if self.catchupper.is_sync_task_running() {
Comment thread
asaf-sw marked this conversation as resolved.
self.catchupper.update_target_height(target_height);
return;
Comment thread
ShahakShama marked this conversation as resolved.
}
self.reset_catchupper();
self.catchupper.start_l2_sync(self.current_height, target_height);
}

Expand Down
56 changes: 56 additions & 0 deletions crates/apollo_l1_events/src/l1_events_provider_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1860,6 +1860,62 @@ async fn test_stuck_sync() {
}
}

#[tokio::test]
async fn restarting_catch_up_raises_target_without_spawning_second_sync_task() {
const STARTUP_HEIGHT: BlockNumber = BlockNumber(1);
const TARGET_HEIGHT: BlockNumber = BlockNumber(10);
const HIGHER_TARGET_HEIGHT: BlockNumber = BlockNumber(20);

// Always fail to fetch blocks, so the sync task never finishes on its own: it loops forever,
// sleeping between retries. This keeps a task in flight across the restart below.
let mut sync_client = MockStateSyncClient::default();
sync_client
.expect_get_block()
.returning(|height| Err(StateSyncError::BlockNotFound(height).into()));
let config = L1EventsProviderConfig {
startup_sync_sleep_retry_interval_seconds: Duration::from_millis(10),
..Default::default()
};
let mut l1_events_provider = L1EventsProvider::new(
config,
Arc::new(FakeL1EventsProviderClient::default()),
Arc::new(sync_client),
None,
);

// Start catching up and capture a handle to the first (forever-running) sync task.
l1_events_provider.initialize(STARTUP_HEIGHT, Default::default()).await.unwrap();
l1_events_provider.start_catching_up(TARGET_HEIGHT);
let SyncTaskHandle::Started(first_sync_task) =
l1_events_provider.catchupper.sync_task_handle.clone()
else {
panic!("First sync task should have started.");
};
assert!(!first_sync_task.is_finished(), "First sync task should be forever running.");
assert_eq!(l1_events_provider.catchupper.target_height(), TARGET_HEIGHT);

// Restart catching up with a higher target while the first task is still bootstrapping. This
// must NOT spawn a second task (a leaked task would race to commit the same blocks and spin in
// a retry loop forever): the existing task is kept and its shared target is raised instead.
l1_events_provider.start_catching_up(HIGHER_TARGET_HEIGHT);

let SyncTaskHandle::Started(sync_task_after_restart) =
l1_events_provider.catchupper.sync_task_handle.clone()
else {
panic!("Sync task handle should still be present after restart.");
};
assert!(
Arc::ptr_eq(&first_sync_task, &sync_task_after_restart),
"Restarting catch-up spawned a second sync task instead of reusing the running one."
);
assert!(!first_sync_task.is_finished(), "The original sync task should still be running.");
assert_eq!(
l1_events_provider.catchupper.target_height(),
HIGHER_TARGET_HEIGHT,
"Restarting catch-up should have raised the running task's target height."
);
}

#[tokio::test]
async fn provider_initialized_in_pending_is_same_as_uninitialized_after_getting_initialize() {
// Setup.
Expand Down
2 changes: 1 addition & 1 deletion crates/apollo_l1_events/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ macro_rules! make_catchupper {
committed_txs: [$(tx_hash!($tx)),*].into()
}),*
].into_iter().collect(),
target_height: BlockNumber(0),
target_height: Default::default(),
l1_events_provider_client: Arc::new(FakeL1EventsProviderClient::default()),
sync_client: Arc::new(MockStateSyncClient::default()),
sync_task_handle: SyncTaskHandle::default(),
Expand Down
Loading