From 7222401d6577209513f602012049f06820b907b7 Mon Sep 17 00:00:00 2001 From: "allen.wu" Date: Fri, 12 Jun 2026 18:19:29 +0800 Subject: [PATCH] feat: add L1 latency halt logic --- node/node.go | 7 ++- rpc/test/helpers.go | 1 + sequencer/broadcast_reactor.go | 20 ++++++- sequencer/broadcast_reactor_test.go | 30 +++++++++++ sequencer/interfaces.go | 11 ++++ sequencer/state_v2.go | 21 +++++--- sequencer/state_v2_test.go | 81 ++++++++++++++++++++--------- test/e2e/node/main.go | 1 + 8 files changed, 139 insertions(+), 33 deletions(-) diff --git a/node/node.go b/node/node.go index 45ae07b592f..a6d0744aa36 100644 --- a/node/node.go +++ b/node/node.go @@ -112,6 +112,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { DefaultMetricsProvider(config.Instrumentation), logger, nil, // sequencerVerifier + nil, // l1Tracker nil, // sequencerSigner nil, // ha: no HA in default node ) @@ -503,6 +504,7 @@ func createSequencerComponents( pool *bc.BlockPool, logger log.Logger, verifier sequencer.SequencerVerifier, + l1Tracker sequencer.L1Tracker, signer sequencer.Signer, sigStore *sequencer.SignatureStore, ha sequencer.SequencerHA, @@ -512,6 +514,7 @@ func createSequencerComponents( l2Node, logger, verifier, + l1Tracker, signer, sigStore, ha, @@ -528,6 +531,7 @@ func createSequencerComponents( stateV2, logger, verifier, + l1Tracker, sigStore, ) broadcastReactor.SetLogger(logger.With("module", "sequencer")) @@ -783,6 +787,7 @@ func NewNode( metricsProvider MetricsProvider, logger log.Logger, sequencerVerifier sequencer.SequencerVerifier, + l1Tracker sequencer.L1Tracker, sequencerSigner sequencer.Signer, ha sequencer.SequencerHA, options ...Option, @@ -1015,6 +1020,7 @@ func NewNode( bcR.Pool(), logger, sequencerVerifier, + l1Tracker, sequencerSigner, sigStore, ha, // HA service injected from NewNode caller; nil disables HA mode @@ -1724,4 +1730,3 @@ func (n *Node) StartReactorsAfterReorg(currentHeight int64) error { } return bcR.SwitchToBlockSyncFromReorg(currentHeight) } - diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 04197b75422..c1dce04961f 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -179,6 +179,7 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node { nm.DefaultMetricsProvider(config.Instrumentation), logger, nil, // sequencerVerifier + nil, // sequencerHealthGate nil, // sequencerSigner nil, // ha: no HA in RPC test node ) diff --git a/sequencer/broadcast_reactor.go b/sequencer/broadcast_reactor.go index 2abf2f16706..fb7509210d4 100644 --- a/sequencer/broadcast_reactor.go +++ b/sequencer/broadcast_reactor.go @@ -99,8 +99,9 @@ type BlockBroadcastReactor struct { routinesStarted atomic.Bool logger log.Logger - verifier SequencerVerifier - sigStore *SignatureStore + verifier SequencerVerifier + l1Tracker L1Tracker // required: gates peer-block sync on L1 freshness + sigStore *SignatureStore // syncRequests tracks pending sync channel requests, keyed by height. // Used to reject unsolicited responses before decode/verification. @@ -132,6 +133,7 @@ func NewBlockBroadcastReactor( stateV2 *StateV2, logger log.Logger, verifier SequencerVerifier, + l1Tracker L1Tracker, sigStore *SignatureStore, ) *BlockBroadcastReactor { r := &BlockBroadcastReactor{ @@ -147,6 +149,7 @@ func NewBlockBroadcastReactor( blockReqLimiter: NewPeerRateLimiter(blockRequestRateLimit, blockRequestBurst), logger: logger.With("module", "broadcastReactor"), verifier: verifier, + l1Tracker: l1Tracker, sigStore: sigStore, } r.BaseReactor = *p2p.NewBaseReactor("BlockBroadcast", r) @@ -303,6 +306,14 @@ func (r *BlockBroadcastReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte if !r.routinesStarted.Load() { return } + // L1 tracker: while L1 is stale, drop all inbound P2P messages. We may be + // blind to L1 SequencerUpdated events, so we neither accept/verify peer + // blocks (verification uses the verifier's stale L1-derived sequencer set, + // which could let a revoked-sequencer block through or mis-ban honest peers) + // nor serve sync requests. Placed before decode so nothing is processed. + if r.l1Tracker.IsHalt() { + return + } r.logger.Debug("Receive message", "chId", chID, "src", src.ID(), "len", len(msgBytes)) msg, err := decodeMsg(msgBytes) if err != nil { @@ -544,6 +555,11 @@ func (r *BlockBroadcastReactor) tryApplyFromCache() { // checkSyncGap: request missing blocks via SequencerSyncChannel // All sync requests go through this method (no longer uses blocksync pool) func (r *BlockBroadcastReactor) checkSyncGap() { + // L1 tracker: while L1 is stale, do not actively pull blocks from peers. + if r.l1Tracker.IsHalt() { + return + } + localHeight := r.stateV2.LatestHeight() maxPeerHeight := r.getPool().MaxPeerHeight() gap := maxPeerHeight - localHeight diff --git a/sequencer/broadcast_reactor_test.go b/sequencer/broadcast_reactor_test.go index 85315dd6b4a..611ae74a03a 100644 --- a/sequencer/broadcast_reactor_test.go +++ b/sequencer/broadcast_reactor_test.go @@ -274,3 +274,33 @@ func TestBanPeer_PersistentPeerAddPeerNotRejected(t *testing.T) { require.False(t, r.isBanned(mp.ID()), "persistent peer must not be considered banned after misbehavior") } + +// ---------------------------------------------------------------------------- +// L1 health gate on the sync path +// ---------------------------------------------------------------------------- + +func TestReceive_L1HaltDropsMessages(t *testing.T) { + // routinesStarted=true and L1 halted: Receive must drop the message at the + // L1 gate, before decode/verify/ban — so it must NOT panic even though + // stateV2/Switch are nil and the payload is garbage. + r := newReactorForTest() + r.logger = log.NewNopLogger() + r.routinesStarted.Store(true) + r.l1Tracker = &mockL1Tracker{halt: true} + + require.NotPanics(t, func() { + r.Receive(BlockBroadcastChannel, nil, []byte("garbage")) + }, "Receive should drop messages at the L1 gate when halted, before decode") +} + +func TestCheckSyncGap_L1HaltBlocksRequests(t *testing.T) { + // With L1 halted, checkSyncGap must return before calling + // stateV2.LatestHeight()/getPool() (both nil here) — so it must NOT panic. + r := newReactorForTest() + r.logger = log.NewNopLogger() + r.l1Tracker = &mockL1Tracker{halt: true} + + require.NotPanics(t, func() { + r.checkSyncGap() + }, "checkSyncGap should early-return at the L1 gate before dereferencing nil pool/stateV2") +} diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index 2a1c09d7336..b9192ff4a2c 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -78,3 +78,14 @@ type SequencerHA interface { // Must be called before Start(). SetOnBlockApplied(fn func(*BlockV2) error) } + +// L1Tracker reports whether L1 RPC has fallen too far behind for this node to +// safely act. When L1 is stale, the node may be blind to L1 SequencerUpdated +// events, so the sequencer must stop producing and fullnodes must stop syncing +// to avoid following a revoked sequencer. Implemented by the L1 tracker in the +// node binary and required by StateV2 / the broadcast reactor. +type L1Tracker interface { + // IsHalt reports whether L1 is stale enough that this node must halt block + // production and sync. + IsHalt() bool +} diff --git a/sequencer/state_v2.go b/sequencer/state_v2.go index 1cbf162c8e0..c5f2e13f501 100644 --- a/sequencer/state_v2.go +++ b/sequencer/state_v2.go @@ -35,12 +35,13 @@ type StateV2 struct { latestBlock *BlockV2 // Dependencies - l2Node l2node.L2Node - signer Signer - verifier SequencerVerifier - sigStore *SignatureStore - ha SequencerHA // nil = single-node mode - logger log.Logger + l2Node l2node.L2Node + signer Signer + verifier SequencerVerifier + l1Tracker L1Tracker // required: gates block production on L1 freshness + sigStore *SignatureStore + ha SequencerHA // nil = single-node mode + logger log.Logger // Block production blockInterval time.Duration // empty-block fallback interval (default 3s) @@ -57,6 +58,7 @@ func NewStateV2( l2Node l2node.L2Node, logger log.Logger, verifier SequencerVerifier, + l1Tracker L1Tracker, signer Signer, sigStore *SignatureStore, ha SequencerHA, @@ -69,6 +71,7 @@ func NewStateV2( l2Node: l2Node, signer: signer, verifier: verifier, + l1Tracker: l1Tracker, sigStore: sigStore, ha: ha, blockInterval: BlockInterval, @@ -203,6 +206,12 @@ func (s *StateV2) isActiveSequencer() bool { return false } + // L1 tracker: stop producing if L1 RPC is stale (we may be blind to + // SequencerUpdated events on L1 and could produce as a revoked sequencer). + if s.l1Tracker.IsHalt() { + return false + } + s.mtx.RLock() lb := s.latestBlock s.mtx.RUnlock() diff --git a/sequencer/state_v2_test.go b/sequencer/state_v2_test.go index 86c5d8fe2b7..054f57bc192 100644 --- a/sequencer/state_v2_test.go +++ b/sequencer/state_v2_test.go @@ -84,7 +84,7 @@ func TestStateV2_NewStateV2(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - stateV2, err := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil) + stateV2, err := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil, nil) if err != nil { t.Fatalf("NewStateV2 failed: %v", err) } @@ -97,7 +97,7 @@ func TestStateV2_LatestHeight(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - stateV2, err := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil) + stateV2, err := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil, nil) if err != nil { t.Fatalf("NewStateV2 failed: %v", err) } @@ -114,13 +114,13 @@ func TestStateV2_HasSigner_MatchesIsSequencerMode(t *testing.T) { mockVerifier := &mockSequencerVerifier{} // Without signer - s1, _ := NewStateV2(mockL2Node, logger, mockVerifier, nil, nil, nil) + s1, _ := NewStateV2(mockL2Node, logger, mockVerifier, nil, nil, nil, nil) if s1.HasSigner() { t.Error("should be false when signer is nil") } // With signer - s2, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockSignerImpl{}, nil, nil) + s2, _ := NewStateV2(mockL2Node, logger, mockVerifier, nil, &mockSignerImpl{}, nil, nil) if !s2.HasSigner() { t.Error("should be true when signer is provided") } @@ -133,7 +133,7 @@ func TestStateV2_SignBlock(t *testing.T) { mockSigner := &mockSignerImpl{signature: make([]byte, 65)} mockVerifier := &mockSequencerVerifier{} - stateV2, err := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, nil) + stateV2, err := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, nil) if err != nil { t.Fatalf("NewStateV2 failed: %v", err) } @@ -155,7 +155,7 @@ func TestStateV2_SignBlockWithoutSigner(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - stateV2, err := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil) + stateV2, err := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil, nil) if err != nil { t.Fatalf("NewStateV2 failed: %v", err) } @@ -175,14 +175,14 @@ func TestNewStateV2_VerifierRequired(t *testing.T) { logger := log.NewNopLogger() // verifier==nil should fail - _, err := NewStateV2(mockL2Node, logger, nil, nil, nil, nil) + _, err := NewStateV2(mockL2Node, logger, nil, nil, nil, nil, nil) if err == nil { t.Fatal("expected error when verifier is nil") } // with signer, still fails without verifier mockSigner := &mockSignerImpl{} - _, err = NewStateV2(mockL2Node, logger, nil, mockSigner, nil, nil) + _, err = NewStateV2(mockL2Node, logger, nil, nil, mockSigner, nil, nil) if err == nil { t.Fatal("expected error when verifier is nil (with signer)") } @@ -195,7 +195,7 @@ func TestNewStateV2_WithHA(t *testing.T) { mockVerifier := &mockSequencerVerifier{} ha := newMockSequencerHA(true) - stateV2, err := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, ha) + stateV2, err := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, ha) if err != nil { t.Fatalf("NewStateV2 failed: %v", err) } @@ -212,14 +212,14 @@ func TestStateV2_HasSigner(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - fullnode, _ := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil) + fullnode, _ := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil, nil) if fullnode.HasSigner() { t.Error("Fullnode should not have signer") } mockSigner := &mockSignerImpl{} mockVerifier := &mockSequencerVerifier{} - seqNode, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, nil) + seqNode, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, nil) if !seqNode.HasSigner() { t.Error("Sequencer node should have signer") } @@ -232,13 +232,13 @@ func TestStateV2_IsHAMode(t *testing.T) { mockVerifier := &mockSequencerVerifier{} // Non-HA - nonHA, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, nil) + nonHA, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, nil) if nonHA.IsHAMode() { t.Error("IsHAMode should be false without ha") } // HA - ha, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, newMockSequencerHA(false)) + ha, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, newMockSequencerHA(false)) if !ha.IsHAMode() { t.Error("IsHAMode should be true with ha") } @@ -251,19 +251,19 @@ func TestStateV2_IsHALeader(t *testing.T) { mockVerifier := &mockSequencerVerifier{} // Non-HA: never leader - nonHA, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, nil) + nonHA, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, nil) if nonHA.IsHALeader() { t.Error("non-HA node should not be HA leader") } // HA follower - follower, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, newMockSequencerHA(false)) + follower, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, newMockSequencerHA(false)) if follower.IsHALeader() { t.Error("HA follower should not be leader") } // HA leader - leader, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, newMockSequencerHA(true)) + leader, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, newMockSequencerHA(true)) if !leader.IsHALeader() { t.Error("HA leader should be leader") } @@ -279,7 +279,7 @@ func TestStateV2_IsActiveSequencer_NonHA_Active(t *testing.T) { mockSigner := &mockSignerImpl{address: common.HexToAddress("0x1")} mockVerifier := &mockSequencerVerifier{isSequencer: true} - s, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, nil) + s, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, nil) s.latestBlock = &BlockV2{Number: 0} if !s.isActiveSequencer() { @@ -293,7 +293,7 @@ func TestStateV2_IsActiveSequencer_NonHA_Inactive(t *testing.T) { mockSigner := &mockSignerImpl{address: common.HexToAddress("0x1")} mockVerifier := &mockSequencerVerifier{isSequencer: false} - s, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, nil) + s, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, nil) s.latestBlock = &BlockV2{Number: 0} if s.isActiveSequencer() { @@ -308,7 +308,7 @@ func TestStateV2_IsActiveSequencer_HA_Leader(t *testing.T) { mockVerifier := &mockSequencerVerifier{isSequencer: true} ha := newMockSequencerHA(true) - s, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, ha) + s, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, ha) s.latestBlock = &BlockV2{Number: 0} if !s.isActiveSequencer() { @@ -323,7 +323,7 @@ func TestStateV2_IsActiveSequencer_HA_Follower(t *testing.T) { mockVerifier := &mockSequencerVerifier{isSequencer: true} // L1 says active ha := newMockSequencerHA(false) // but not leader - s, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, ha) + s, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, ha) s.latestBlock = &BlockV2{Number: 0} if s.isActiveSequencer() { @@ -337,7 +337,7 @@ func TestStateV2_IsActiveSequencer_VerifierError(t *testing.T) { mockSigner := &mockSignerImpl{} mockVerifier := &mockSequencerVerifier{err: errors.New("rpc error")} - s, _ := NewStateV2(mockL2Node, logger, mockVerifier, mockSigner, nil, nil) + s, _ := NewStateV2(mockL2Node, logger, mockVerifier, &mockL1Tracker{halt: false}, mockSigner, nil, nil) s.latestBlock = &BlockV2{Number: 0} if s.isActiveSequencer() { @@ -353,7 +353,7 @@ func TestStateV2_ApplyBlock_Idempotent(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - s, _ := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil) + s, _ := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil, nil) block := &types.BlockV2{Number: 1, Signature: []byte{0x01, 0x02, 0x03}} // Apply twice should not error @@ -372,7 +372,7 @@ func TestStateV2_ApplyBlock_OlderBlockSkipped(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - s, _ := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil) + s, _ := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil, nil) block2 := &types.BlockV2{Number: 2, Signature: []byte{0x01, 0x02, 0x03}} block1 := &types.BlockV2{Number: 1, Signature: []byte{0x01, 0x02, 0x03}} @@ -394,7 +394,7 @@ func TestStateV2_ApplyBlock_Sequential(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - s, _ := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil) + s, _ := NewStateV2(mockL2Node, logger, &mockSequencerVerifier{}, nil, nil, nil, nil) for i := uint64(1); i <= 5; i++ { block := &types.BlockV2{Number: i, Signature: []byte{0x01, 0x02, 0x03}} @@ -406,3 +406,36 @@ func TestStateV2_ApplyBlock_Sequential(t *testing.T) { t.Errorf("LatestHeight = %d, want 5", s.LatestHeight()) } } + +// mockL1Tracker is a controllable L1Tracker for tests. +type mockL1Tracker struct { + halt bool +} + +func (m *mockL1Tracker) IsHalt() bool { return m.halt } + +func TestIsActiveSequencer_L1TrackerHaltsProduction(t *testing.T) { + logger := log.NewNopLogger() + verifier := &mockSequencerVerifier{isSequencer: true} + signer := &mockSignerImpl{} + + // L1 healthy -> active (verifier says we are the sequencer). + s, err := NewStateV2(newTestMockL2Node(), logger, verifier, &mockL1Tracker{halt: false}, signer, nil, nil) + if err != nil { + t.Fatalf("NewStateV2: %v", err) + } + s.latestBlock = &BlockV2{Number: 10} + if !s.isActiveSequencer() { + t.Fatal("expected active when L1 healthy and verifier says sequencer") + } + + // L1 halted -> NOT active even though verifier says we are the sequencer. + s2, err := NewStateV2(newTestMockL2Node(), logger, verifier, &mockL1Tracker{halt: true}, signer, nil, nil) + if err != nil { + t.Fatalf("NewStateV2: %v", err) + } + s2.latestBlock = &BlockV2{Number: 10} + if s2.isActiveSequencer() { + t.Fatal("expected NOT active when L1 tracker halts production") + } +} diff --git a/test/e2e/node/main.go b/test/e2e/node/main.go index c81b546c94d..ea3da40a916 100644 --- a/test/e2e/node/main.go +++ b/test/e2e/node/main.go @@ -134,6 +134,7 @@ func startNode(cfg *Config) error { node.DefaultMetricsProvider(tmcfg.Instrumentation), nodeLogger, nil, // sequencerVerifier + nil, // sequencerHealthGate nil, // sequencerSigner nil, // ha: no HA in e2e test node )