-
Notifications
You must be signed in to change notification settings - Fork 2.2k
channeldb: fix race condition in link node pruning #10462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1363,11 +1363,7 @@ func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) ( | |
| // the pending funds in a channel that has been forcibly closed have been | ||
| // swept. | ||
| func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error { | ||
| var ( | ||
| openChannels []*OpenChannel | ||
| pruneLinkNode *btcec.PublicKey | ||
| ) | ||
| err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error { | ||
| return kvdb.Update(c.backend, func(tx kvdb.RwTx) error { | ||
| var b bytes.Buffer | ||
| if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil { | ||
| return err | ||
|
|
@@ -1413,44 +1409,72 @@ func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error { | |
| // other open channels with this peer. If we don't we'll | ||
| // garbage collect it to ensure we don't establish persistent | ||
| // connections to peers without open channels. | ||
| pruneLinkNode = chanSummary.RemotePub | ||
| openChannels, err = c.fetchOpenChannels( | ||
| tx, pruneLinkNode, | ||
| ) | ||
| remotePub := chanSummary.RemotePub | ||
| openChannels, err := c.fetchOpenChannels(tx, remotePub) | ||
| if err != nil { | ||
| return fmt.Errorf("unable to fetch open channels for "+ | ||
| "peer %x: %v", | ||
| pruneLinkNode.SerializeCompressed(), err) | ||
| remotePub.SerializeCompressed(), err) | ||
| } | ||
|
|
||
| return nil | ||
| }, func() { | ||
| openChannels = nil | ||
| pruneLinkNode = nil | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if len(openChannels) > 0 { | ||
| return nil | ||
| } | ||
|
|
||
| // If there are no open channels with this peer, prune the | ||
| // link node. We do this within the same transaction to avoid | ||
| // a race condition where a new channel could be opened | ||
| // between this check and the deletion. | ||
| log.Infof("Pruning link node %x with zero open "+ | ||
| "channels from database", | ||
| remotePub.SerializeCompressed()) | ||
|
|
||
| err = deleteLinkNode(tx, remotePub) | ||
| if err != nil { | ||
| return fmt.Errorf("unable to delete link "+ | ||
| "node: %w", err) | ||
| } | ||
|
|
||
| // Decide whether we want to remove the link node, based upon the number | ||
| // of still open channels. | ||
| return c.pruneLinkNode(openChannels, pruneLinkNode) | ||
| return nil | ||
| }, func() {}) | ||
| } | ||
|
|
||
| // pruneLinkNode determines whether we should garbage collect a link node from | ||
| // the database due to no longer having any open channels with it. If there are | ||
| // any left, then this acts as a no-op. | ||
| func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel, | ||
| remotePub *btcec.PublicKey) error { | ||
| // the database due to no longer having any open channels with it. | ||
| // | ||
| // NOTE: This function should be called after an initial check shows no open | ||
| // channels exist. It will double-check within a write transaction to avoid a | ||
| // race condition where a channel could be opened between the initial check | ||
| // and the deletion. | ||
| func (c *ChannelStateDB) pruneLinkNode(remotePub *btcec.PublicKey) error { | ||
| return kvdb.Update(c.backend, func(tx kvdb.RwTx) error { | ||
| // Double-check for open channels to avoid deleting a link node | ||
| // if a channel was opened since the caller's initial check. | ||
| // | ||
| // NOTE: This avoids a race condition where a channel could be | ||
| // opened between the initial check and the deletion. | ||
| openChannels, err := c.fetchOpenChannels(tx, remotePub) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if len(openChannels) > 0 { | ||
| return nil | ||
| } | ||
| // If channels exist now, don't prune. | ||
| if len(openChannels) > 0 { | ||
| return nil | ||
| } | ||
|
|
||
| log.Infof("Pruning link node %x with zero open channels from database", | ||
| remotePub.SerializeCompressed()) | ||
| // No open channels, safe to prune the link node. | ||
| log.Infof("Pruning link node %x with zero open channels "+ | ||
| "from database", | ||
| remotePub.SerializeCompressed()) | ||
|
|
||
| return c.linkNodeDB.DeleteLinkNode(remotePub) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Guess this
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is still used for tests so that we can add and remove link nodes, I think we can still keep it. |
||
| err = deleteLinkNode(tx, remotePub) | ||
| if err != nil { | ||
| return fmt.Errorf("unable to prune link node: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| }, func() {}) | ||
| } | ||
|
|
||
| // PruneLinkNodes attempts to prune all link nodes found within the database | ||
|
|
@@ -1479,12 +1503,103 @@ func (c *ChannelStateDB) PruneLinkNodes() error { | |
| return err | ||
| } | ||
|
|
||
| err = c.pruneLinkNode(openChannels, linkNode.IdentityPub) | ||
| if len(openChannels) > 0 { | ||
ziggie1984 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| continue | ||
| } | ||
|
|
||
| err = c.pruneLinkNode(linkNode.IdentityPub) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // RepairLinkNodes scans all channels in the database and ensures that a | ||
| // link node exists for each remote peer. This should be called on startup to | ||
| // ensure that our database is consistent. | ||
| // | ||
| // NOTE: This function is designed to repair database inconsistencies that may | ||
| // have occurred due to the race condition in link node pruning (where link | ||
| // nodes could be incorrectly deleted while channels still existed). This can | ||
| // be removed once we move to native sql. | ||
| func (c *ChannelStateDB) RepairLinkNodes(network wire.BitcoinNet) error { | ||
| // In a single read transaction, build a list of all peers with open | ||
| // channels and check which ones are missing link nodes. | ||
| var missingPeers []*btcec.PublicKey | ||
|
|
||
| err := kvdb.View(c.backend, func(tx kvdb.RTx) error { | ||
| openChanBucket := tx.ReadBucket(openChannelBucket) | ||
| if openChanBucket == nil { | ||
| return ErrNoActiveChannels | ||
| } | ||
|
|
||
| var peersWithChannels []*btcec.PublicKey | ||
|
|
||
| err := openChanBucket.ForEach(func(nodePubBytes, | ||
| _ []byte) error { | ||
|
|
||
| nodePub, err := btcec.ParsePubKey(nodePubBytes) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| channels, err := c.fetchOpenChannels(tx, nodePub) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if len(channels) > 0 { | ||
| peersWithChannels = append( | ||
| peersWithChannels, nodePub, | ||
| ) | ||
| } | ||
|
|
||
| return nil | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Now check which peers are missing link nodes within the | ||
| // same transaction. | ||
| missingPeers, err = c.linkNodeDB.FindMissingLinkNodes( | ||
| tx, peersWithChannels, | ||
| ) | ||
|
|
||
| return err | ||
| }, func() { | ||
| missingPeers = nil | ||
| }) | ||
| if err != nil && !errors.Is(err, ErrNoActiveChannels) { | ||
| return fmt.Errorf("unable to fetch channels: %w", err) | ||
| } | ||
|
|
||
| // Early exit if no repairs needed. | ||
| if len(missingPeers) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| // Create all missing link nodes in a single write transaction | ||
| // using the LinkNodeDB abstraction. | ||
| linkNodesToCreate := make([]*LinkNode, 0, len(missingPeers)) | ||
| for _, remotePub := range missingPeers { | ||
| linkNode := NewLinkNode(c.linkNodeDB, network, remotePub) | ||
| linkNodesToCreate = append(linkNodesToCreate, linkNode) | ||
|
|
||
| log.Infof("Repairing missing link node for peer %x", | ||
| remotePub.SerializeCompressed()) | ||
| } | ||
|
|
||
| err = c.linkNodeDB.CreateLinkNodes(nil, linkNodesToCreate) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| log.Infof("Repaired %d missing link nodes on startup", | ||
| len(missingPeers)) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, we should have been doing this in the same db transaction the entire time.
Do we need any sort of reconciliation logic on start up to handle instances that might've happened in the wild? So sanity check that for each open channel, we have a link node present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a fix at startup, not sure if it is worth it but I think it does not cost much either and can be removed when moving to sql native ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking is that there may be nodes in an anomalous state that have a channel, but not an active
LinkNode.Eg, on start up, we use this to determine who we ned to connect out to:
lnd/server.go
Lines 3489 to 3495 in 801de79
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh you are right did not see that. But this is now fixed with the repair function in the beginning of the server startup.