diff --git a/concurrent_test.go b/concurrent_test.go index 07c0c1f77..43e061c33 100644 --- a/concurrent_test.go +++ b/concurrent_test.go @@ -10,10 +10,13 @@ import ( mrand "math/rand" "os" "path/filepath" + "slices" "sort" + "strconv" "strings" "sync" "testing" + "text/tabwriter" "time" "unicode/utf8" @@ -954,3 +957,79 @@ func executeLongRunningRead(t *testing.T, name string, db *bolt.DB, bucket []byt return err } + +// TestConcurrentView is a sort of benchmark really, not a functional test, +// it simulates a number of concurrent DB readers trying to open a transaction +// and do something. It can't be a Go benchmark since worst-case times are even +// more interesting here than any averages. It can be helpful for transaction +// management optimization. +func TestConcurrentView(t *testing.T) { + t.Skip("intended for manual runs") + const ( + numOfEntries = 10000 + iters = 10 + ) + + db := mustCreateDB(t, &bolt.Options{ + PageSize: 4096, + NoStatistics: true, + }) + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(bucketPrefix)) + if err != nil { + return err + } + for i := range numOfEntries { + err = b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) + if err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + w := new(tabwriter.Writer) + w.Init(os.Stdout, 0, 8, 1, '\t', 0) + fmt.Fprintln(w, "workers\tsamples\tmin\tavg\t50%\t80%\t90%\tmax") + for _, workers := range []int{1, 10, 100, 1000, 10000} { + t.Run("workers"+strconv.Itoa(workers), func(t *testing.T) { + tChan := make(chan time.Duration, workers*iters) + res := make([]time.Duration, 0, workers*iters) + + for range workers { + go func() { + for range iters { + start := time.Now() + err := db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bucketPrefix)) + _ = b.Get([]byte(strconv.Itoa(numOfEntries / 2))) + time.Sleep(30 * time.Microsecond) + return nil + }) + dur := time.Since(start) + require.NoError(t, err) + tChan <- dur + } + }() + } + for t := range tChan { + res = append(res, t) + if len(res) == cap(res) { + break + } + } + slices.Sort(res) + var avg time.Duration + for i := range res { + avg += res[i] + } + avg /= time.Duration(len(res)) + fmt.Fprintf(w, "%d\t%d\t%s\t%s\t%s\t%s\t%s\t%s\n", workers, len(res), res[0], avg, res[len(res)/2], res[len(res)*8/10], res[len(res)*9/10], res[len(res)-1]) + }) + } + w.Flush() + err = db.Close() + require.NoError(t, err) + t.Fail() // Deliberately to see the result easily. +} diff --git a/db.go b/db.go index 4171983bc..93be9c119 100644 --- a/db.go +++ b/db.go @@ -36,12 +36,6 @@ const ( // All data access is performed through transactions which can be obtained through the DB. // All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called. type DB struct { - // Put `stats` at the first field to ensure it's 64-bit aligned. Note that - // the first word in an allocated struct can be relied upon to be 64-bit - // aligned. Refer to https://pkg.go.dev/sync/atomic#pkg-note-BUG. Also - // refer to discussion in https://github.com/etcd-io/bbolt/issues/577. - stats Stats - // When enabled, the database will perform a Check() after every commit. // A panic is issued if the database is in an inconsistent state. This // flag has a large performance impact so it should only be used for @@ -132,7 +126,7 @@ type DB struct { pageSize int opened bool rwtx *Tx - txs []*Tx + stats *Stats freelist fl.Interface freelistLoad sync.Once @@ -143,7 +137,7 @@ type DB struct { batch *batch rwlock sync.Mutex // Allows only one writer at a time. - metalock sync.Mutex // Protects meta page access. + metalock sync.RWMutex // Protects meta page access. mmaplock sync.RWMutex // Protects mmap access during remapping. statlock sync.RWMutex // Protects stats access. @@ -197,6 +191,10 @@ func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) { db.MaxBatchDelay = common.DefaultMaxBatchDelay db.AllocSize = common.DefaultAllocSize + if !options.NoStatistics { + db.stats = new(Stats) + } + if options.Logger == nil { db.logger = getDiscardLogger() } else { @@ -424,7 +422,9 @@ func (db *DB) loadFreelist() { // Read free list from freelist page. db.freelist.Read(db.page(db.meta().Freelist())) } - db.stats.FreePageN = db.freelist.FreeCount() + if db.stats != nil { + db.stats.FreePageN = db.freelist.FreeCount() + } }) } @@ -769,7 +769,7 @@ func (db *DB) beginTx() (*Tx, error) { // Lock the meta pages while we initialize the transaction. We obtain // the meta lock before the mmap lock because that's the order that the // write transaction will obtain them. - db.metalock.Lock() + db.metalock.RLock() // Obtain a read-only lock on the mmap. When the mmap is remapped it will // obtain a write lock so all transactions must finish before it can be @@ -779,14 +779,14 @@ func (db *DB) beginTx() (*Tx, error) { // Exit if the database is not open yet. if !db.opened { db.mmaplock.RUnlock() - db.metalock.Unlock() + db.metalock.RUnlock() return nil, berrors.ErrDatabaseNotOpen } // Exit if the database is not correctly mapped. if db.data == nil { db.mmaplock.RUnlock() - db.metalock.Unlock() + db.metalock.RUnlock() return nil, berrors.ErrInvalidMapping } @@ -794,21 +794,20 @@ func (db *DB) beginTx() (*Tx, error) { t := &Tx{} t.init(db) - // Keep track of transaction until it closes. - db.txs = append(db.txs, t) - n := len(db.txs) + // Unlock the meta pages. + db.metalock.RUnlock() + if db.freelist != nil { db.freelist.AddReadonlyTXID(t.meta.Txid()) } - // Unlock the meta pages. - db.metalock.Unlock() - // Update the transaction stats. - db.statlock.Lock() - db.stats.TxN++ - db.stats.OpenTxN = n - db.statlock.Unlock() + if db.stats != nil { + db.statlock.Lock() + db.stats.TxN++ + db.stats.OpenTxN++ + db.statlock.Unlock() + } return t, nil } @@ -825,8 +824,8 @@ func (db *DB) beginRWTx() (*Tx, error) { // Once we have the writer lock then we can lock the meta pages so that // we can set up the transaction. - db.metalock.Lock() - defer db.metalock.Unlock() + db.metalock.RLock() + defer db.metalock.RUnlock() // Exit if the database is not open yet. if !db.opened { @@ -844,7 +843,7 @@ func (db *DB) beginRWTx() (*Tx, error) { t := &Tx{writable: true} t.init(db) db.rwtx = t - db.freelist.ReleasePendingPages() + db.freelist.ReleasePendingPages(t.meta.Txid()) return t, nil } @@ -853,32 +852,17 @@ func (db *DB) removeTx(tx *Tx) { // Release the read lock on the mmap. db.mmaplock.RUnlock() - // Use the meta lock to restrict access to the DB object. - db.metalock.Lock() - - // Remove the transaction. - for i, t := range db.txs { - if t == tx { - last := len(db.txs) - 1 - db.txs[i] = db.txs[last] - db.txs[last] = nil - db.txs = db.txs[:last] - break - } - } - n := len(db.txs) if db.freelist != nil { db.freelist.RemoveReadonlyTXID(tx.meta.Txid()) } - // Unlock the meta pages. - db.metalock.Unlock() - // Merge statistics. - db.statlock.Lock() - db.stats.OpenTxN = n - db.stats.TxStats.add(&tx.stats) - db.statlock.Unlock() + if db.stats != nil { + db.statlock.Lock() + db.stats.OpenTxN-- + db.stats.TxStats.add(&tx.stats) + db.statlock.Unlock() + } } // Update executes a function within the context of a read-write managed transaction. @@ -1096,9 +1080,13 @@ func (db *DB) Sync() (err error) { // Stats retrieves ongoing performance stats for the database. // This is only updated when a transaction closes. func (db *DB) Stats() Stats { - db.statlock.RLock() - defer db.statlock.RUnlock() - return db.stats + var s Stats + if db.stats != nil { + db.statlock.RLock() + s = *db.stats + db.statlock.RUnlock() + } + return s } // This is for internal access to the raw data bytes from the C cursor, use @@ -1336,6 +1324,11 @@ type Options struct { // Logger is the logger used for bbolt. Logger Logger + + // NoStatistics turns off statistics collection, Stats method will + // return empty structure in this case. This can be beneficial for + // performance in some cases. + NoStatistics bool } func (o *Options) String() string { @@ -1343,8 +1336,8 @@ func (o *Options) String() string { return "{}" } - return fmt.Sprintf("{Timeout: %s, NoGrowSync: %t, NoFreelistSync: %t, PreLoadFreelist: %t, FreelistType: %s, ReadOnly: %t, MmapFlags: %x, InitialMmapSize: %d, PageSize: %d, NoSync: %t, OpenFile: %p, Mlock: %t, Logger: %p}", - o.Timeout, o.NoGrowSync, o.NoFreelistSync, o.PreLoadFreelist, o.FreelistType, o.ReadOnly, o.MmapFlags, o.InitialMmapSize, o.PageSize, o.NoSync, o.OpenFile, o.Mlock, o.Logger) + return fmt.Sprintf("{Timeout: %s, NoGrowSync: %t, NoFreelistSync: %t, PreLoadFreelist: %t, FreelistType: %s, ReadOnly: %t, MmapFlags: %x, InitialMmapSize: %d, PageSize: %d, NoSync: %t, OpenFile: %p, Mlock: %t, Logger: %p, NoStatistics: %t}", + o.Timeout, o.NoGrowSync, o.NoFreelistSync, o.PreLoadFreelist, o.FreelistType, o.ReadOnly, o.MmapFlags, o.InitialMmapSize, o.PageSize, o.NoSync, o.OpenFile, o.Mlock, o.Logger, o.NoStatistics) } diff --git a/internal/freelist/freelist.go b/internal/freelist/freelist.go index 2b819506b..2d2da3dc8 100644 --- a/internal/freelist/freelist.go +++ b/internal/freelist/freelist.go @@ -43,7 +43,7 @@ type Interface interface { RemoveReadonlyTXID(txid common.Txid) // ReleasePendingPages releases any pages associated with closed read-only transactions. - ReleasePendingPages() + ReleasePendingPages(txid common.Txid) // Free releases a page and its overflow for a given transaction id. // If the page is already free or is one of the meta pages, then a panic will occur. diff --git a/internal/freelist/freelist_test.go b/internal/freelist/freelist_test.go index 12ac4b6b2..45676ed05 100644 --- a/internal/freelist/freelist_test.go +++ b/internal/freelist/freelist_test.go @@ -79,7 +79,7 @@ func TestFreelist_free_freelist_alloctx(t *testing.T) { if exp := []common.Pgid{12}; !reflect.DeepEqual(exp, f.pendingPageIds()[101].ids) { t.Fatalf("exp=%v; got=%v", exp, f.pendingPageIds()[101].ids) } - f.ReleasePendingPages() + f.ReleasePendingPages(102) require.True(t, f.Freed(12)) require.Empty(t, f.pendingPageIds()) if exp := common.Pgids([]common.Pgid{12}); !reflect.DeepEqual(exp, f.freePageIds()) { @@ -372,7 +372,7 @@ func TestFreelist_E2E_HappyPath(t *testing.T) { // someone wants to do a read on top of the next tx id f.AddReadonlyTXID(common.Txid(3)) // this should free the above pages for tx 2 entirely - f.ReleasePendingPages() + f.ReleasePendingPages(4) requirePages(t, f, common.Pgids{3, 5, 8}, common.Pgids{}) // no span of two pages available should yield a zero-page result @@ -400,7 +400,7 @@ func TestFreelist_E2E_MultiSpanOverflows(t *testing.T) { f.Free(common.Txid(10), common.NewPage(39, common.LeafPageFlag, 0, 2)) f.Free(common.Txid(10), common.NewPage(45, common.LeafPageFlag, 0, 4)) requirePages(t, f, common.Pgids{}, common.Pgids{20, 21, 25, 26, 27, 35, 36, 37, 38, 39, 40, 41, 45, 46, 47, 48, 49}) - f.ReleasePendingPages() + f.ReleasePendingPages(11) requirePages(t, f, common.Pgids{20, 21, 25, 26, 27, 35, 36, 37, 38, 39, 40, 41, 45, 46, 47, 48, 49}, common.Pgids{}) // that sequence, regardless of implementation, should always yield the same blocks of pages @@ -428,7 +428,7 @@ func TestFreelist_E2E_Rollbacks(t *testing.T) { // unknown transaction should not trigger anything freelist.Free(common.Txid(4), common.NewPage(13, common.LeafPageFlag, 0, 3)) requirePages(t, freelist, common.Pgids{}, common.Pgids{13, 14, 15, 16}) - freelist.ReleasePendingPages() + freelist.ReleasePendingPages(5) requirePages(t, freelist, common.Pgids{13, 14, 15, 16}, common.Pgids{}) freelist.Rollback(common.Txid(1337)) requirePages(t, freelist, common.Pgids{13, 14, 15, 16}, common.Pgids{}) @@ -453,7 +453,7 @@ func TestFreelist_E2E_Reload(t *testing.T) { freelist.Init([]common.Pgid{}) freelist.Free(common.Txid(2), common.NewPage(5, common.LeafPageFlag, 0, 1)) freelist.Free(common.Txid(2), common.NewPage(8, common.LeafPageFlag, 0, 0)) - freelist.ReleasePendingPages() + freelist.ReleasePendingPages(3) requirePages(t, freelist, common.Pgids{5, 6, 8}, common.Pgids{}) buf := make([]byte, 4096) p := common.LoadPage(buf) @@ -489,7 +489,7 @@ func TestFreelist_E2E_SerDe_HappyPath(t *testing.T) { freelist.Init([]common.Pgid{}) freelist.Free(common.Txid(2), common.NewPage(5, common.LeafPageFlag, 0, 1)) freelist.Free(common.Txid(2), common.NewPage(8, common.LeafPageFlag, 0, 0)) - freelist.ReleasePendingPages() + freelist.ReleasePendingPages(3) requirePages(t, freelist, common.Pgids{5, 6, 8}, common.Pgids{}) freelist.Free(common.Txid(3), common.NewPage(3, common.LeafPageFlag, 0, 1)) @@ -519,7 +519,7 @@ func TestFreelist_E2E_SerDe_AcrossImplementations(t *testing.T) { freelist.Free(common.Txid(1), common.NewPage(pgid, common.LeafPageFlag, 0, 0)) expectedFreePgids = append(expectedFreePgids, pgid) } - freelist.ReleasePendingPages() + freelist.ReleasePendingPages(2) requirePages(t, freelist, expectedFreePgids, common.Pgids{}) buf := make([]byte, freelist.EstimatedWritePageSize()) p := common.LoadPage(buf) diff --git a/internal/freelist/shared.go b/internal/freelist/shared.go index f2d113008..0490ce78b 100644 --- a/internal/freelist/shared.go +++ b/internal/freelist/shared.go @@ -3,7 +3,10 @@ package freelist import ( "fmt" "math" + "slices" "sort" + "sync" + "sync/atomic" "unsafe" "go.etcd.io/bbolt/internal/common" @@ -15,10 +18,16 @@ type txPending struct { lastReleaseBegin common.Txid // beginning txid of last matching releaseRange } +type txIdReference struct { + txid common.Txid + refs atomic.Int32 +} + type shared struct { Interface - readonlyTXIDs []common.Txid // all readonly transaction IDs. + readerRefsMtx sync.RWMutex + readerRefs []*txIdReference allocs map[common.Pgid]common.Txid // mapping of Txid that allocated a pgid. cache map[common.Pgid]struct{} // fast lookup of all free and pending page ids. pending map[common.Txid]*txPending // mapping of soon-to-be free page ids by tx. @@ -118,18 +127,25 @@ func (t *shared) Rollback(txid common.Txid) { } func (t *shared) AddReadonlyTXID(tid common.Txid) { - t.readonlyTXIDs = append(t.readonlyTXIDs, tid) + t.readerRefsMtx.RLock() + for _, r := range t.readerRefs { + if r.txid == tid { + r.refs.Add(1) + break + } + } + t.readerRefsMtx.RUnlock() } func (t *shared) RemoveReadonlyTXID(tid common.Txid) { - for i := range t.readonlyTXIDs { - if t.readonlyTXIDs[i] == tid { - last := len(t.readonlyTXIDs) - 1 - t.readonlyTXIDs[i] = t.readonlyTXIDs[last] - t.readonlyTXIDs = t.readonlyTXIDs[:last] + t.readerRefsMtx.RLock() + for _, r := range t.readerRefs { + if r.txid == tid { + r.refs.Add(-1) break } } + t.readerRefsMtx.RUnlock() } type txIDx []common.Txid @@ -138,21 +154,30 @@ func (t txIDx) Len() int { return len(t) } func (t txIDx) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t txIDx) Less(i, j int) bool { return t[i] < t[j] } -func (t *shared) ReleasePendingPages() { +func (t *shared) ReleasePendingPages(tid common.Txid) { // Free all pending pages prior to the earliest open transaction. - sort.Sort(txIDx(t.readonlyTXIDs)) minid := common.Txid(math.MaxUint64) - if len(t.readonlyTXIDs) > 0 { - minid = t.readonlyTXIDs[0] + inUseID := tid - 1 // new readers can still use it + + t.readerRefsMtx.Lock() + t.readerRefs = slices.DeleteFunc(t.readerRefs, func(e *txIdReference) bool { + return e.txid < inUseID && e.refs.Load() == 0 + }) + for i := range t.readerRefs { + if t.readerRefs[i].refs.Load() != 0 && minid > t.readerRefs[i].txid { + minid = t.readerRefs[i].txid + } } if minid > 0 { t.release(minid - 1) } // Release unused txid extents. - for _, tid := range t.readonlyTXIDs { - t.releaseRange(minid, tid-1) - minid = tid + 1 + for _, e := range t.readerRefs { + t.releaseRange(minid, e.txid-1) + minid = e.txid + 1 } + t.readerRefs = append(t.readerRefs, &txIdReference{txid: tid}) + t.readerRefsMtx.Unlock() t.releaseRange(minid, common.Txid(math.MaxUint64)) // Any page both allocated and freed in an extent is safe to release. } diff --git a/tx.go b/tx.go index 7b5db7727..066448c55 100644 --- a/tx.go +++ b/tx.go @@ -357,13 +357,15 @@ func (tx *Tx) close() { tx.db.rwlock.Unlock() // Merge statistics. - tx.db.statlock.Lock() - tx.db.stats.FreePageN = freelistFreeN - tx.db.stats.PendingPageN = freelistPendingN - tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize - tx.db.stats.FreelistInuse = freelistAlloc - tx.db.stats.TxStats.add(&tx.stats) - tx.db.statlock.Unlock() + if tx.db.stats != nil { + tx.db.statlock.Lock() + tx.db.stats.FreePageN = freelistFreeN + tx.db.stats.PendingPageN = freelistPendingN + tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize + tx.db.stats.FreelistInuse = freelistAlloc + tx.db.stats.TxStats.add(&tx.stats) + tx.db.statlock.Unlock() + } } else { tx.db.removeTx(tx) } @@ -558,7 +560,9 @@ func (tx *Tx) writeMeta() error { lg := tx.db.Logger() buf := make([]byte, tx.db.pageSize) p := tx.db.pageInBuffer(buf, 0) + tx.db.metalock.Lock() tx.meta.Write(p) + tx.db.metalock.Unlock() // Write the meta page to file. if _, err := tx.db.ops.writeAt(buf, int64(p.Id())*int64(tx.db.pageSize)); err != nil {