Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 147 additions & 32 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,11 +1363,7 @@ func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
// the pending funds in a channel that has been forcibly closed have been
// swept.
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
var (
openChannels []*OpenChannel
pruneLinkNode *btcec.PublicKey
)
err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
var b bytes.Buffer
if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
return err
Expand Down Expand Up @@ -1413,44 +1409,72 @@ func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
// other open channels with this peer. If we don't we'll
// garbage collect it to ensure we don't establish persistent
// connections to peers without open channels.
pruneLinkNode = chanSummary.RemotePub
openChannels, err = c.fetchOpenChannels(
tx, pruneLinkNode,
)
remotePub := chanSummary.RemotePub
openChannels, err := c.fetchOpenChannels(tx, remotePub)
if err != nil {
return fmt.Errorf("unable to fetch open channels for "+
"peer %x: %v",
pruneLinkNode.SerializeCompressed(), err)
remotePub.SerializeCompressed(), err)
}

return nil
}, func() {
openChannels = nil
pruneLinkNode = nil
})
if err != nil {
return err
}
if len(openChannels) > 0 {
return nil
}

// If there are no open channels with this peer, prune the
// link node. We do this within the same transaction to avoid
// a race condition where a new channel could be opened
// between this check and the deletion.
log.Infof("Pruning link node %x with zero open "+
"channels from database",
remotePub.SerializeCompressed())

err = deleteLinkNode(tx, remotePub)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, we should have been doing this in the same db transaction the entire time.

Do we need any sort of reconciliation logic on start up to handle instances that might've happened in the wild? So sanity check that for each open channel, we have a link node present.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a fix at startup, not sure if it is worth it but I think it does not cost much either and can be removed when moving to sql native ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it is worth it

My thinking is that there may be nodes in an anomalous state that have a channel, but not an active LinkNode.

Eg, on start up, we use this to determine who we ned to connect out to:

lnd/server.go

Lines 3489 to 3495 in 801de79

// Iterate through the list of LinkNodes to find addresses we should
// attempt to connect to based on our set of previous connections. Set
// the reconnection port to the default peer port.
linkNodes, err := s.chanStateDB.LinkNodeDB().FetchAllLinkNodes()
if err != nil && !errors.Is(err, channeldb.ErrLinkNodesNotFound) {
return fmt.Errorf("failed to fetch all link nodes: %w", err)
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh you are right did not see that. But this is now fixed with the repair function in the beginning of the server startup.

if err != nil {
return fmt.Errorf("unable to delete link "+
"node: %w", err)
}

// Decide whether we want to remove the link node, based upon the number
// of still open channels.
return c.pruneLinkNode(openChannels, pruneLinkNode)
return nil
}, func() {})
}

// pruneLinkNode determines whether we should garbage collect a link node from
// the database due to no longer having any open channels with it. If there are
// any left, then this acts as a no-op.
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
remotePub *btcec.PublicKey) error {
// the database due to no longer having any open channels with it.
//
// NOTE: This function should be called after an initial check shows no open
// channels exist. It will double-check within a write transaction to avoid a
// race condition where a channel could be opened between the initial check
// and the deletion.
func (c *ChannelStateDB) pruneLinkNode(remotePub *btcec.PublicKey) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
// Double-check for open channels to avoid deleting a link node
// if a channel was opened since the caller's initial check.
//
// NOTE: This avoids a race condition where a channel could be
// opened between the initial check and the deletion.
openChannels, err := c.fetchOpenChannels(tx, remotePub)
if err != nil {
return err
}

if len(openChannels) > 0 {
return nil
}
// If channels exist now, don't prune.
if len(openChannels) > 0 {
return nil
}

log.Infof("Pruning link node %x with zero open channels from database",
remotePub.SerializeCompressed())
// No open channels, safe to prune the link node.
log.Infof("Pruning link node %x with zero open channels "+
"from database",
remotePub.SerializeCompressed())

return c.linkNodeDB.DeleteLinkNode(remotePub)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess this DeleteLinkNode can now be deleted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is still used for tests so that we can add and remove link nodes, I think we can still keep it.

err = deleteLinkNode(tx, remotePub)
if err != nil {
return fmt.Errorf("unable to prune link node: %w", err)
}

return nil
}, func() {})
}

// PruneLinkNodes attempts to prune all link nodes found within the database
Expand Down Expand Up @@ -1479,12 +1503,103 @@ func (c *ChannelStateDB) PruneLinkNodes() error {
return err
}

err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
if len(openChannels) > 0 {
continue
}

err = c.pruneLinkNode(linkNode.IdentityPub)
if err != nil {
return err
}
}

return nil
}

// RepairLinkNodes scans all channels in the database and ensures that a
// link node exists for each remote peer. This should be called on startup to
// ensure that our database is consistent.
//
// NOTE: This function is designed to repair database inconsistencies that may
// have occurred due to the race condition in link node pruning (where link
// nodes could be incorrectly deleted while channels still existed). This can
// be removed once we move to native sql.
func (c *ChannelStateDB) RepairLinkNodes(network wire.BitcoinNet) error {
// In a single read transaction, build a list of all peers with open
// channels and check which ones are missing link nodes.
var missingPeers []*btcec.PublicKey

err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
openChanBucket := tx.ReadBucket(openChannelBucket)
if openChanBucket == nil {
return ErrNoActiveChannels
}

var peersWithChannels []*btcec.PublicKey

err := openChanBucket.ForEach(func(nodePubBytes,
_ []byte) error {

nodePub, err := btcec.ParsePubKey(nodePubBytes)
if err != nil {
return err
}

channels, err := c.fetchOpenChannels(tx, nodePub)
if err != nil {
return err
}

if len(channels) > 0 {
peersWithChannels = append(
peersWithChannels, nodePub,
)
}

return nil
})
if err != nil {
return err
}

// Now check which peers are missing link nodes within the
// same transaction.
missingPeers, err = c.linkNodeDB.FindMissingLinkNodes(
tx, peersWithChannels,
)

return err
}, func() {
missingPeers = nil
})
if err != nil && !errors.Is(err, ErrNoActiveChannels) {
return fmt.Errorf("unable to fetch channels: %w", err)
}

// Early exit if no repairs needed.
if len(missingPeers) == 0 {
return nil
}

// Create all missing link nodes in a single write transaction
// using the LinkNodeDB abstraction.
linkNodesToCreate := make([]*LinkNode, 0, len(missingPeers))
for _, remotePub := range missingPeers {
linkNode := NewLinkNode(c.linkNodeDB, network, remotePub)
linkNodesToCreate = append(linkNodesToCreate, linkNode)

log.Infof("Repairing missing link node for peer %x",
remotePub.SerializeCompressed())
}

err = c.linkNodeDB.CreateLinkNodes(nil, linkNodesToCreate)
if err != nil {
return err
}

log.Infof("Repaired %d missing link nodes on startup",
len(missingPeers))

return nil
}

Expand Down
91 changes: 91 additions & 0 deletions channeldb/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package channeldb

import (
"bytes"
"errors"
"fmt"
"io"
"net"
"time"
Expand Down Expand Up @@ -134,6 +136,95 @@ type LinkNodeDB struct {
backend kvdb.Backend
}

// FindMissingLinkNodes checks which of the provided public keys do not have
// corresponding link nodes in the database. If tx is nil, a new read
// transaction will be created. Otherwise, the provided transaction is used,
// allowing this to be part of a larger batch operation.
func (l *LinkNodeDB) FindMissingLinkNodes(tx kvdb.RTx,
pubKeys []*btcec.PublicKey) ([]*btcec.PublicKey, error) {

var missing []*btcec.PublicKey

findMissing := func(readTx kvdb.RTx) error {
nodeMetaBucket := readTx.ReadBucket(nodeInfoBucket)
if nodeMetaBucket == nil {
// If the bucket doesn't exist, all peers are missing.
missing = pubKeys
return nil
}

for _, pubKey := range pubKeys {
_, err := fetchLinkNode(readTx, pubKey)
if err == nil {
// Link node exists.
continue
}

if !errors.Is(err, ErrNodeNotFound) {
return fmt.Errorf("unable to check link node "+
"for peer %x: %w",
pubKey.SerializeCompressed(), err)
}

// Link node doesn't exist.
missing = append(missing, pubKey)
}

return nil
}

// If no transaction provided, create our own.
if tx == nil {
err := kvdb.View(l.backend, findMissing, func() {
missing = nil
})

return missing, err
}

// Use the provided transaction.
err := findMissing(tx)

return missing, err
}

// CreateLinkNodes creates multiple link nodes. If tx is nil, a new write
// transaction will be created. Otherwise, the provided transaction is used,
// allowing this to be part of a larger batch operation.
func (l *LinkNodeDB) CreateLinkNodes(tx kvdb.RwTx,
linkNodes []*LinkNode) error {

createNodes := func(writeTx kvdb.RwTx) error {
nodeMetaBucket, err := writeTx.CreateTopLevelBucket(
nodeInfoBucket,
)
if err != nil {
return err
}

for _, linkNode := range linkNodes {
err := putLinkNode(nodeMetaBucket, linkNode)
if err != nil {
pubKey := linkNode.IdentityPub.
SerializeCompressed()

return fmt.Errorf("unable to create link "+
"node for peer %x: %w", pubKey, err)
}
}

return nil
}

// If no transaction provided, create our own.
if tx == nil {
return kvdb.Update(l.backend, createNodes, func() {})
}

// Use the provided transaction.
return createNodes(tx)
}

// DeleteLinkNode removes the link node with the given identity from the
// database.
func (l *LinkNodeDB) DeleteLinkNode(identity *btcec.PublicKey) error {
Expand Down
Loading
Loading