Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
mrand "math/rand"
"os"
"path/filepath"
"slices"
"sort"
"strconv"
"strings"
"sync"
"testing"
"text/tabwriter"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -954,3 +957,79 @@ func executeLongRunningRead(t *testing.T, name string, db *bolt.DB, bucket []byt

return err
}

// TestConcurrentView is a sort of benchmark really, not a functional test,
// it simulates a number of concurrent DB readers trying to open a transaction
// and do something. It can't be a Go benchmark since worst-case times are even
// more interesting here than any averages. It can be helpful for transaction
// management optimization.
func TestConcurrentView(t *testing.T) {
t.Skip("intended for manual runs")
const (
numOfEntries = 10000
iters = 10
)

db := mustCreateDB(t, &bolt.Options{
PageSize: 4096,
NoStatistics: true,
})
err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(bucketPrefix))
if err != nil {
return err
}
for i := range numOfEntries {
err = b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
if err != nil {
return err
}
}
return nil
})
require.NoError(t, err)

w := new(tabwriter.Writer)
w.Init(os.Stdout, 0, 8, 1, '\t', 0)
fmt.Fprintln(w, "workers\tsamples\tmin\tavg\t50%\t80%\t90%\tmax")
for _, workers := range []int{1, 10, 100, 1000, 10000} {
t.Run("workers"+strconv.Itoa(workers), func(t *testing.T) {
tChan := make(chan time.Duration, workers*iters)
res := make([]time.Duration, 0, workers*iters)

for range workers {
go func() {
for range iters {
start := time.Now()
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketPrefix))
_ = b.Get([]byte(strconv.Itoa(numOfEntries / 2)))
time.Sleep(30 * time.Microsecond)
return nil
})
dur := time.Since(start)
require.NoError(t, err)
tChan <- dur
}
}()
}
for t := range tChan {
res = append(res, t)
if len(res) == cap(res) {
break
}
}
slices.Sort(res)
var avg time.Duration
for i := range res {
avg += res[i]
}
avg /= time.Duration(len(res))
fmt.Fprintf(w, "%d\t%d\t%s\t%s\t%s\t%s\t%s\t%s\n", workers, len(res), res[0], avg, res[len(res)/2], res[len(res)*8/10], res[len(res)*9/10], res[len(res)-1])
})
}
w.Flush()
err = db.Close()
require.NoError(t, err)
t.Fail() // Deliberately to see the result easily.
}
95 changes: 44 additions & 51 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ const (
// All data access is performed through transactions which can be obtained through the DB.
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
type DB struct {
// Put `stats` at the first field to ensure it's 64-bit aligned. Note that
// the first word in an allocated struct can be relied upon to be 64-bit
// aligned. Refer to https://pkg.go.dev/sync/atomic#pkg-note-BUG. Also
// refer to discussion in https://github.com/etcd-io/bbolt/issues/577.
stats Stats

// When enabled, the database will perform a Check() after every commit.
// A panic is issued if the database is in an inconsistent state. This
// flag has a large performance impact so it should only be used for
Expand Down Expand Up @@ -132,7 +126,7 @@ type DB struct {
pageSize int
opened bool
rwtx *Tx
txs []*Tx
stats *Stats

freelist fl.Interface
freelistLoad sync.Once
Expand All @@ -143,7 +137,7 @@ type DB struct {
batch *batch

rwlock sync.Mutex // Allows only one writer at a time.
metalock sync.Mutex // Protects meta page access.
metalock sync.RWMutex // Protects meta page access.
mmaplock sync.RWMutex // Protects mmap access during remapping.
statlock sync.RWMutex // Protects stats access.

Expand Down Expand Up @@ -197,6 +191,10 @@ func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
db.MaxBatchDelay = common.DefaultMaxBatchDelay
db.AllocSize = common.DefaultAllocSize

if !options.NoStatistics {
db.stats = new(Stats)
}

if options.Logger == nil {
db.logger = getDiscardLogger()
} else {
Expand Down Expand Up @@ -424,7 +422,9 @@ func (db *DB) loadFreelist() {
// Read free list from freelist page.
db.freelist.Read(db.page(db.meta().Freelist()))
}
db.stats.FreePageN = db.freelist.FreeCount()
if db.stats != nil {
db.stats.FreePageN = db.freelist.FreeCount()
}
})
}

Expand Down Expand Up @@ -769,7 +769,7 @@ func (db *DB) beginTx() (*Tx, error) {
// Lock the meta pages while we initialize the transaction. We obtain
// the meta lock before the mmap lock because that's the order that the
// write transaction will obtain them.
db.metalock.Lock()
db.metalock.RLock()

// Obtain a read-only lock on the mmap. When the mmap is remapped it will
// obtain a write lock so all transactions must finish before it can be
Expand All @@ -779,36 +779,35 @@ func (db *DB) beginTx() (*Tx, error) {
// Exit if the database is not open yet.
if !db.opened {
db.mmaplock.RUnlock()
db.metalock.Unlock()
db.metalock.RUnlock()
return nil, berrors.ErrDatabaseNotOpen
}

// Exit if the database is not correctly mapped.
if db.data == nil {
db.mmaplock.RUnlock()
db.metalock.Unlock()
db.metalock.RUnlock()
return nil, berrors.ErrInvalidMapping
}

// Create a transaction associated with the database.
t := &Tx{}
t.init(db)

// Keep track of transaction until it closes.
db.txs = append(db.txs, t)
n := len(db.txs)
// Unlock the meta pages.
db.metalock.RUnlock()

if db.freelist != nil {
db.freelist.AddReadonlyTXID(t.meta.Txid())
}

// Unlock the meta pages.
db.metalock.Unlock()

// Update the transaction stats.
db.statlock.Lock()
db.stats.TxN++
db.stats.OpenTxN = n
db.statlock.Unlock()
if db.stats != nil {
db.statlock.Lock()
db.stats.TxN++
db.stats.OpenTxN++
db.statlock.Unlock()
}

return t, nil
}
Expand All @@ -825,8 +824,8 @@ func (db *DB) beginRWTx() (*Tx, error) {

// Once we have the writer lock then we can lock the meta pages so that
// we can set up the transaction.
db.metalock.Lock()
defer db.metalock.Unlock()
db.metalock.RLock()
defer db.metalock.RUnlock()

// Exit if the database is not open yet.
if !db.opened {
Expand All @@ -844,7 +843,7 @@ func (db *DB) beginRWTx() (*Tx, error) {
t := &Tx{writable: true}
t.init(db)
db.rwtx = t
db.freelist.ReleasePendingPages()
db.freelist.ReleasePendingPages(t.meta.Txid())
return t, nil
}

Expand All @@ -853,32 +852,17 @@ func (db *DB) removeTx(tx *Tx) {
// Release the read lock on the mmap.
db.mmaplock.RUnlock()

// Use the meta lock to restrict access to the DB object.
db.metalock.Lock()

// Remove the transaction.
for i, t := range db.txs {
if t == tx {
last := len(db.txs) - 1
db.txs[i] = db.txs[last]
db.txs[last] = nil
db.txs = db.txs[:last]
break
}
}
n := len(db.txs)
if db.freelist != nil {
db.freelist.RemoveReadonlyTXID(tx.meta.Txid())
}

// Unlock the meta pages.
db.metalock.Unlock()

// Merge statistics.
db.statlock.Lock()
db.stats.OpenTxN = n
db.stats.TxStats.add(&tx.stats)
db.statlock.Unlock()
if db.stats != nil {
db.statlock.Lock()
db.stats.OpenTxN--
db.stats.TxStats.add(&tx.stats)
db.statlock.Unlock()
}
}

// Update executes a function within the context of a read-write managed transaction.
Expand Down Expand Up @@ -1096,9 +1080,13 @@ func (db *DB) Sync() (err error) {
// Stats retrieves ongoing performance stats for the database.
// This is only updated when a transaction closes.
func (db *DB) Stats() Stats {
db.statlock.RLock()
defer db.statlock.RUnlock()
return db.stats
var s Stats
if db.stats != nil {
db.statlock.RLock()
s = *db.stats
db.statlock.RUnlock()
}
return s
}

// This is for internal access to the raw data bytes from the C cursor, use
Expand Down Expand Up @@ -1336,15 +1324,20 @@ type Options struct {

// Logger is the logger used for bbolt.
Logger Logger

// NoStatistics turns off statistics collection, Stats method will
// return empty structure in this case. This can be beneficial for
// performance in some cases.
NoStatistics bool
}

func (o *Options) String() string {
if o == nil {
return "{}"
}

return fmt.Sprintf("{Timeout: %s, NoGrowSync: %t, NoFreelistSync: %t, PreLoadFreelist: %t, FreelistType: %s, ReadOnly: %t, MmapFlags: %x, InitialMmapSize: %d, PageSize: %d, NoSync: %t, OpenFile: %p, Mlock: %t, Logger: %p}",
o.Timeout, o.NoGrowSync, o.NoFreelistSync, o.PreLoadFreelist, o.FreelistType, o.ReadOnly, o.MmapFlags, o.InitialMmapSize, o.PageSize, o.NoSync, o.OpenFile, o.Mlock, o.Logger)
return fmt.Sprintf("{Timeout: %s, NoGrowSync: %t, NoFreelistSync: %t, PreLoadFreelist: %t, FreelistType: %s, ReadOnly: %t, MmapFlags: %x, InitialMmapSize: %d, PageSize: %d, NoSync: %t, OpenFile: %p, Mlock: %t, Logger: %p, NoStatistics: %t}",
o.Timeout, o.NoGrowSync, o.NoFreelistSync, o.PreLoadFreelist, o.FreelistType, o.ReadOnly, o.MmapFlags, o.InitialMmapSize, o.PageSize, o.NoSync, o.OpenFile, o.Mlock, o.Logger, o.NoStatistics)

}

Expand Down
2 changes: 1 addition & 1 deletion internal/freelist/freelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Interface interface {
RemoveReadonlyTXID(txid common.Txid)

// ReleasePendingPages releases any pages associated with closed read-only transactions.
ReleasePendingPages()
ReleasePendingPages(txid common.Txid)

// Free releases a page and its overflow for a given transaction id.
// If the page is already free or is one of the meta pages, then a panic will occur.
Expand Down
14 changes: 7 additions & 7 deletions internal/freelist/freelist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestFreelist_free_freelist_alloctx(t *testing.T) {
if exp := []common.Pgid{12}; !reflect.DeepEqual(exp, f.pendingPageIds()[101].ids) {
t.Fatalf("exp=%v; got=%v", exp, f.pendingPageIds()[101].ids)
}
f.ReleasePendingPages()
f.ReleasePendingPages(102)
require.True(t, f.Freed(12))
require.Empty(t, f.pendingPageIds())
if exp := common.Pgids([]common.Pgid{12}); !reflect.DeepEqual(exp, f.freePageIds()) {
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestFreelist_E2E_HappyPath(t *testing.T) {
// someone wants to do a read on top of the next tx id
f.AddReadonlyTXID(common.Txid(3))
// this should free the above pages for tx 2 entirely
f.ReleasePendingPages()
f.ReleasePendingPages(4)
requirePages(t, f, common.Pgids{3, 5, 8}, common.Pgids{})

// no span of two pages available should yield a zero-page result
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestFreelist_E2E_MultiSpanOverflows(t *testing.T) {
f.Free(common.Txid(10), common.NewPage(39, common.LeafPageFlag, 0, 2))
f.Free(common.Txid(10), common.NewPage(45, common.LeafPageFlag, 0, 4))
requirePages(t, f, common.Pgids{}, common.Pgids{20, 21, 25, 26, 27, 35, 36, 37, 38, 39, 40, 41, 45, 46, 47, 48, 49})
f.ReleasePendingPages()
f.ReleasePendingPages(11)
requirePages(t, f, common.Pgids{20, 21, 25, 26, 27, 35, 36, 37, 38, 39, 40, 41, 45, 46, 47, 48, 49}, common.Pgids{})

// that sequence, regardless of implementation, should always yield the same blocks of pages
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestFreelist_E2E_Rollbacks(t *testing.T) {
// unknown transaction should not trigger anything
freelist.Free(common.Txid(4), common.NewPage(13, common.LeafPageFlag, 0, 3))
requirePages(t, freelist, common.Pgids{}, common.Pgids{13, 14, 15, 16})
freelist.ReleasePendingPages()
freelist.ReleasePendingPages(5)
requirePages(t, freelist, common.Pgids{13, 14, 15, 16}, common.Pgids{})
freelist.Rollback(common.Txid(1337))
requirePages(t, freelist, common.Pgids{13, 14, 15, 16}, common.Pgids{})
Expand All @@ -453,7 +453,7 @@ func TestFreelist_E2E_Reload(t *testing.T) {
freelist.Init([]common.Pgid{})
freelist.Free(common.Txid(2), common.NewPage(5, common.LeafPageFlag, 0, 1))
freelist.Free(common.Txid(2), common.NewPage(8, common.LeafPageFlag, 0, 0))
freelist.ReleasePendingPages()
freelist.ReleasePendingPages(3)
requirePages(t, freelist, common.Pgids{5, 6, 8}, common.Pgids{})
buf := make([]byte, 4096)
p := common.LoadPage(buf)
Expand Down Expand Up @@ -489,7 +489,7 @@ func TestFreelist_E2E_SerDe_HappyPath(t *testing.T) {
freelist.Init([]common.Pgid{})
freelist.Free(common.Txid(2), common.NewPage(5, common.LeafPageFlag, 0, 1))
freelist.Free(common.Txid(2), common.NewPage(8, common.LeafPageFlag, 0, 0))
freelist.ReleasePendingPages()
freelist.ReleasePendingPages(3)
requirePages(t, freelist, common.Pgids{5, 6, 8}, common.Pgids{})

freelist.Free(common.Txid(3), common.NewPage(3, common.LeafPageFlag, 0, 1))
Expand Down Expand Up @@ -519,7 +519,7 @@ func TestFreelist_E2E_SerDe_AcrossImplementations(t *testing.T) {
freelist.Free(common.Txid(1), common.NewPage(pgid, common.LeafPageFlag, 0, 0))
expectedFreePgids = append(expectedFreePgids, pgid)
}
freelist.ReleasePendingPages()
freelist.ReleasePendingPages(2)
requirePages(t, freelist, expectedFreePgids, common.Pgids{})
buf := make([]byte, freelist.EstimatedWritePageSize())
p := common.LoadPage(buf)
Expand Down
Loading