Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type DB struct {
batch *batch

rwlock sync.Mutex // Allows only one writer at a time.
metalock sync.Mutex // Protects meta page access.
metalock sync.RWMutex // Protects meta page access.
mmaplock sync.RWMutex // Protects mmap access during remapping.
statlock sync.RWMutex // Protects stats access.

Expand Down Expand Up @@ -422,6 +422,7 @@ func (db *DB) getPageSizeFromSecondMeta() (int, bool, error) {
func (db *DB) loadFreelist() {
db.freelistLoad.Do(func() {
db.freelist = newFreelist(db.FreelistType)
db.freelist.AddCurrentTXID(db.meta().Txid())
if !db.hasSyncedFreelist() {
// Reconstruct free list by scanning the DB.
db.freelist.Init(db.freepages())
Expand Down Expand Up @@ -776,7 +777,7 @@ func (db *DB) beginTx() (*Tx, error) {
// Lock the meta pages while we initialize the transaction. We obtain
// the meta lock before the mmap lock because that's the order that the
// write transaction will obtain them.
db.metalock.Lock()
db.metalock.RLock()

// Obtain a read-only lock on the mmap. When the mmap is remapped it will
// obtain a write lock so all transactions must finish before it can be
Expand All @@ -786,14 +787,14 @@ func (db *DB) beginTx() (*Tx, error) {
// Exit if the database is not open yet.
if !db.opened {
db.mmaplock.RUnlock()
db.metalock.Unlock()
db.metalock.RUnlock()
return nil, berrors.ErrDatabaseNotOpen
}

// Exit if the database is not correctly mapped.
if db.data == nil {
db.mmaplock.RUnlock()
db.metalock.Unlock()
db.metalock.RUnlock()
return nil, berrors.ErrInvalidMapping
}

Expand All @@ -806,7 +807,7 @@ func (db *DB) beginTx() (*Tx, error) {
}

// Unlock the meta pages.
db.metalock.Unlock()
db.metalock.RUnlock()

// Update the transaction stats.
if db.stats != nil {
Expand All @@ -831,8 +832,8 @@ func (db *DB) beginRWTx() (*Tx, error) {

// Once we have the writer lock then we can lock the meta pages so that
// we can set up the transaction.
db.metalock.Lock()
defer db.metalock.Unlock()
db.metalock.RLock()
defer db.metalock.RUnlock()

// Exit if the database is not open yet.
if !db.opened {
Expand Down Expand Up @@ -860,14 +861,14 @@ func (db *DB) removeTx(tx *Tx) {
db.mmaplock.RUnlock()

// Use the meta lock to restrict access to the DB object.
db.metalock.Lock()
db.metalock.RLock()

if db.freelist != nil {
db.freelist.RemoveReadonlyTXID(tx.meta.Txid())
}

// Unlock the meta pages.
db.metalock.Unlock()
db.metalock.RUnlock()

// Merge statistics.
if db.stats != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/freelist/freelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type Interface interface {
// Init initializes this freelist with the given list of pages.
Init(ids common.Pgids)

// AddCurrentTXID adds the latest known ID to the list.
AddCurrentTXID(tid common.Txid)

// Allocate tries to allocate the given number of contiguous pages
// from the free list pages. It returns the starting page ID if
// available; otherwise, it returns 0.
Expand Down
56 changes: 40 additions & 16 deletions internal/freelist/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package freelist
import (
"fmt"
"math"
"slices"
"sort"
"sync/atomic"
"unsafe"

"go.etcd.io/bbolt/internal/common"
Expand All @@ -15,13 +17,18 @@ type txPending struct {
lastReleaseBegin common.Txid // beginning txid of last matching releaseRange
}

type txIdReference struct {
txid common.Txid
refs atomic.Int32
}

type shared struct {
Interface

readonlyTXIDs []common.Txid // all readonly transaction IDs.
allocs map[common.Pgid]common.Txid // mapping of Txid that allocated a pgid.
cache map[common.Pgid]struct{} // fast lookup of all free and pending page ids.
pending map[common.Txid]*txPending // mapping of soon-to-be free page ids by tx.
readerRefs []*txIdReference
allocs map[common.Pgid]common.Txid // mapping of Txid that allocated a pgid.
cache map[common.Pgid]struct{} // fast lookup of all free and pending page ids.
pending map[common.Txid]*txPending // mapping of soon-to-be free page ids by tx.
}

func newShared() *shared {
Expand Down Expand Up @@ -118,15 +125,18 @@ func (t *shared) Rollback(txid common.Txid) {
}

func (t *shared) AddReadonlyTXID(tid common.Txid) {
t.readonlyTXIDs = append(t.readonlyTXIDs, tid)
for _, r := range t.readerRefs {
if r.txid == tid {
r.refs.Add(1)
break
}
}
}

func (t *shared) RemoveReadonlyTXID(tid common.Txid) {
for i := range t.readonlyTXIDs {
if t.readonlyTXIDs[i] == tid {
last := len(t.readonlyTXIDs) - 1
t.readonlyTXIDs[i] = t.readonlyTXIDs[last]
t.readonlyTXIDs = t.readonlyTXIDs[:last]
for _, r := range t.readerRefs {
if r.txid == tid {
r.refs.Add(-1)
break
}
}
Expand All @@ -140,23 +150,37 @@ func (t txIDx) Less(i, j int) bool { return t[i] < t[j] }

func (t *shared) ReleasePendingPages() {
// Free all pending pages prior to the earliest open transaction.
sort.Sort(txIDx(t.readonlyTXIDs))
minid := common.Txid(math.MaxUint64)
if len(t.readonlyTXIDs) > 0 {
minid = t.readonlyTXIDs[0]

for i := range t.readerRefs {
if t.readerRefs[i].refs.Load() != 0 && minid > t.readerRefs[i].txid {
minid = t.readerRefs[i].txid
}
}
if minid > 0 {
t.release(minid - 1)
}
// Release unused txid extents.
for _, tid := range t.readonlyTXIDs {
t.releaseRange(minid, tid-1)
minid = tid + 1
for _, e := range t.readerRefs {
t.releaseRange(minid, e.txid-1)
minid = e.txid + 1
}
t.releaseRange(minid, common.Txid(math.MaxUint64))
// Any page both allocated and freed in an extent is safe to release.
}

func (t *shared) AddCurrentTXID(tid common.Txid) {
inUseID := tid - 1 // New readers can still use it till meta pages are updated.

t.readerRefs = slices.DeleteFunc(t.readerRefs, func(e *txIdReference) bool {
// tid can be left by previous unsuccessful transaction, there
// can't be any readers using it, but we'll append it anyway
// below (this just simplifies code).
return (e.txid < inUseID && e.refs.Load() == 0) || e.txid == tid
})
t.readerRefs = append(t.readerRefs, &txIdReference{txid: tid})
}

func (t *shared) release(txid common.Txid) {
m := make(common.Pgids, 0)
for tid, txp := range t.pending {
Expand Down
1 change: 1 addition & 0 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ func (tx *Tx) writeMeta() error {
lg.Errorf("writeAt failed, pgid: %d, pageSize: %d, error: %v", p.Id(), tx.db.pageSize, err)
return err
}
tx.db.freelist.AddCurrentTXID(tx.meta.Txid())
tx.db.metalock.Unlock()
if !tx.db.NoSync || common.IgnoreNoSync {
// gofail: var beforeSyncMetaPage struct{}
Expand Down