Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3037 +/- ##
==========================================
- Coverage 58.30% 58.24% -0.07%
==========================================
Files 2108 2108
Lines 173683 173318 -365
==========================================
- Hits 101268 100949 -319
+ Misses 63397 63366 -31
+ Partials 9018 9003 -15
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| type connSet[C peerConn] = im.Map[connID, C] | ||
|
|
||
| func GetAny[C peerConn](cs connSet[C], id types.NodeID) (C,bool) { | ||
| if c,ok := cs.Get(connID{id,true}); ok { return c,true } |
There was a problem hiding this comment.
So we will always prefer outbound connection over inbound connection? Would we expect to have two connections between most stable peers?
There was a problem hiding this comment.
Users of GetAny are not distinguishing inbound from outbound, so the fact that we allow both to coexist in some corner cases is an implementation detail.
There was a problem hiding this comment.
in the current implementation we do NOT expect having 2 connections between stable peers. We try to avoid it best effort (via dial rule which disallows dialing if there exist an inbound connection), but keep the internal peermanager logic relatively simple, we allow 2 connections coexist in case of a race condition during dialing.
| for inner,ctrl := range m.inner.Lock() { | ||
| // Start with pool which has NOT dialed previously (for fairness). | ||
| pools := utils.Slice(inner.persistent,inner.regular) | ||
| if pools[0]==inner.lastDialPool { |
There was a problem hiding this comment.
I don't understand the design choice here. I assume persistent peers are configured "friends" so they should always be dialed asap, and since the user configured it, it's unlikely to have many invalid entries anyway. Am I missing anything?
There was a problem hiding this comment.
persistent peers are very likely to have bad addresses: people add persistent peers but don't care about keeping the list healthy. Also in case any of the persistent peers goes down, we don't want it to block dialing regular peers.
| for inner, ctrl := range p.inner.Lock() { | ||
| inner.DialFailed(addr) | ||
| // DialFailed notifies the peer manager that dialing addresses of id has failed. | ||
| func (m *peerManager[C]) DialFailed(id types.NodeID) error { |
There was a problem hiding this comment.
Would we mark the node failed on temporary failure as well? Should we have more complicated model to distinguish between temporary failure and long term failure (like NAT addresses)?
There was a problem hiding this comment.
we could, what do you propose?
There was a problem hiding this comment.
Not in this PR, but in the future we could give a rating to how flaky some address is and pass the information along to peers maybe.
| if len(addrs) == 0 { | ||
| return nil | ||
| } | ||
| func (m *peerManager[C]) PushPex(sender types.NodeID, addrs []NodeAddress) error { |
There was a problem hiding this comment.
How much verification does PushPex give us?
Assuming we don't have many malicious peers who send bad addresses, do we trust incoming address blindly?
There was a problem hiding this comment.
PushPex keeps separate capacity for each sender (to avoid unbounded flood of bad addresses). Other than that, we trust them blindly. I was thinking about gossiping signed versioned addresses (so that it is not feasible to gossip fake addresses of honest nodes), but I'm not sure atm how much value it would bring and whether it would be computationally feasible.
| r.peerManager.DialFailed(id) | ||
| return err | ||
| } | ||
| dialAddrRaw := hConn.conn.RemoteAddr() |
There was a problem hiding this comment.
Do we save IP:port instead of hostname now? What if the hostname later resolves to a different IP:port?
There was a problem hiding this comment.
If it resolves to different IP:port, then the TCP connection will terminate and we will stop advertising the IP:port, since we are no longer connected to the peer. I would like to eventually disallow gossiping dns addresses, in favor of resolving them locally first, but for backward compatibility and to limit the scope of changes, we do the resolution only when dialing the peer (i.e. SelfAddress stays unresolved for now).
| inner utils.Watch[*peerManagerInner[C]] | ||
|
|
||
| inner utils.Watch[*peerManagerInner[C]] | ||
| conns utils.AtomicRecv[connSet[C]] | ||
| } | ||
|
|
||
| func (p *peerManager[C]) LogState() { | ||
| for inner := range p.inner.Lock() { | ||
| p.logger.Info("p2p connections", |
There was a problem hiding this comment.
Do we have metrics showing roughly how many peers are oscillating? Could it be that a few bad peers contributed to the most of the disconnections? Are those permanent bad addresses like NAT addresses or just having network issues?
There was a problem hiding this comment.
It is very likely that the preconfigured bad persistent addrs will be responsible for most of the dial failures. We have metrics counting tcp dials/accepts, but not accounting them to specific node ids. Do you have some specific metric in mind that you would like to have here? NodeID/IP have too large cardinality to use as a prometheus metric label, but perhaps we can collect it in some smarter way. We also have the net_info http endpoint that we can expose such stats in a dashboard. Or we can include them in log error messages (depending on verbosity needed).
There was a problem hiding this comment.
NAT is not a problem, because we do not support gossiping unidirectional NAT addresses - we only gossip outbound dial addresses (i.e. addresses that we successfully dialed), or preconfigured ExternalAddresses (self-declared static addresses). About private network addresses (including private ips, kubernetes internal dns, etc.) - I don't know the scale of the problem.
| for _, e := range t.bySender { | ||
| if !yield(e) { | ||
| return | ||
| } | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for old, priority := range p.out { | ||
| if id, ok := low.Get(); !ok || priority < id.priority { | ||
| low = utils.Some(pNodeID{priority, old}) | ||
| } | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| ID types.NodeID | ||
| Channels ChannelIDSet | ||
| DialAddr utils.Option[NodeAddress] | ||
| SelfAddr utils.Option[NodeAddress] |
There was a problem hiding this comment.
nit: DialAddr here means outbound address while SelfAddr is self-declared address? (So it's inbound connection?)
The names are a bit confusing.
| regular: newPoolManager(&poolConfig{ | ||
| MaxIn: options.maxInbound(), | ||
| MaxOut: options.maxOutbound(), | ||
| FixedAddrs: bootstrapAddrs, |
There was a problem hiding this comment.
Is it possible someone set all bootstrap addresses in persistent so FixedAddrs is now empty?
| // It makes the global topology converge to an uniformly random graph | ||
| // of a bounded degree. | ||
| priority := func(id types.NodeID) uint64 { | ||
| return binary.LittleEndian.Uint64(h.Sum([]byte(id)[:8])) |
There was a problem hiding this comment.
I think this is wrong. h.Sum(b) will append hash digest to b instead of hashing b, and Uint64() just takes the first 8 bytes, so we are only using the first 8 bytes of id, not the hash.
| } | ||
| return nil | ||
| }) | ||
| return nil |
There was a problem hiding this comment.
Should the return nil be here ? We don't want to exit the loop right?
| // We cannot keep it unbounded to avoid OOM. | ||
| const maxHistory = 10000 | ||
| if len(p.dialHistory) >= maxHistory { | ||
| p.dialHistory = map[types.NodeID]struct{}{} |
There was a problem hiding this comment.
Should we do a total reset here or should we just clear part of the history?
| @@ -157,77 +133,6 @@ func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) { | |||
| testNet.listenForPeerDown(t, 1, 0) | |||
| } | |||
|
|
|||
| func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) { | |||
There was a problem hiding this comment.
Are these tests replaced by new tests somewhere?
| } | ||
|
|
||
| // Connected adds conn to the connections pool. | ||
| // Connected peer won't be available for dialing until disconnect (we don't need duplicate connections). | ||
| // May close and drop a duplicate connection already present in the pool. | ||
| // Returns an error if the connection should be rejected. | ||
| func (m *peerManager[C]) Connected(conn C) error { | ||
| id := conn.Info().connID() | ||
| if id.NodeID == m.selfID { |
There was a problem hiding this comment.
when would this ever happen?
| pool := inner.poolByID(id.NodeID) | ||
| toDisconnect, err := pool.Connect(id) | ||
| if err != nil { | ||
| conn.Close() |
There was a problem hiding this comment.
Hmm, we are closing a connection just established, when would this be necessary?
| } | ||
| return ids | ||
| } | ||
|
|
||
| // DEPRECATED, currently returns an address iff we are connected to id. |
There was a problem hiding this comment.
nit: this one is still used somewhere right?
| selfID := privKey.Public().NodeID() | ||
| peerManager := newPeerManager[*ConnV2](logger, selfID, options) | ||
| peerDB, err := newPeerDB(db, options.maxPeers()) | ||
| peerDB, err := newPeerDB(db, min(options.maxOutbound(), 100)) |
To make the p2p network uniformly random, nodes need to be able to change their peers periodically (otherwise network would be centralized at bootstrap peers). This PR implements an algorithm based on stable hashing, which makes nodes assign random priority to every node ID, and connect to peers with higher priority.
This pr also separates inbound and outbound connection pools to simplify logic. In particular it is possible that there will be 2 concurrent connections between 2 peers (inbound + outbound). Avoiding duplicate connections is best effort - nodes try not to dial peers that they are connected to, but in case 2 peers dial each other at the same time they let those 2 connections be.
Basic requirements: