diff --git a/CHANGELOG.md b/CHANGELOG.md index c6fe673a8fc..fb20b99ac96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,7 +51,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Include W3C TraceFlags (bits 0–7) in the OTLP `Span.Flags` field in `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracehttp` and `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracegrpc`. (#7438) - The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types. If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442) -- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427) +- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427, #7474, #7478) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 2b60410801b..bffdb93d3bc 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -74,12 +74,13 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] { // LastValue returns a last-value aggregate function input and output. func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { - lv := newLastValue[N](b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: - return b.filter(lv.measure), lv.delta + lv := newDeltaLastValue[N](b.AggregationLimit, b.resFunc()) + return b.filter(lv.measure), lv.collect default: - return b.filter(lv.measure), lv.cumulative + lv := newCumulativeLastValue[N](b.AggregationLimit, b.resFunc()) + return b.filter(lv.measure), lv.collect } } @@ -126,12 +127,13 @@ func (b Builder[N]) ExplicitBucketHistogram( boundaries []float64, noMinMax, noSum bool, ) (Measure[N], ComputeAggregation) { - h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: - return b.filter(h.measure), h.delta + h := newDeltaHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) + return b.filter(h.measure), h.collect default: - return b.filter(h.measure), h.cumulative + h := newCumulativeHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) + return b.filter(h.measure), h.collect } } @@ -141,12 +143,13 @@ func (b Builder[N]) ExponentialBucketHistogram( maxSize, maxScale int32, noMinMax, noSum bool, ) (Measure[N], ComputeAggregation) { - h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: - return b.filter(h.measure), h.delta + h := newDeltaExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc()) + return b.filter(h.measure), h.collect default: - return b.filter(h.measure), h.cumulative + h := newCumulativeExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc()) + return b.filter(h.measure), h.collect } } diff --git a/sdk/metric/internal/aggregate/atomic.go b/sdk/metric/internal/aggregate/atomic.go index 0fa6d3c6fa8..dba9dbba9af 100644 --- a/sdk/metric/internal/aggregate/atomic.go +++ b/sdk/metric/internal/aggregate/atomic.go @@ -51,6 +51,97 @@ func (n *atomicCounter[N]) add(value N) { } } +// reset resets the internal state, and is not safe to call concurrently. +func (n *atomicCounter[N]) reset() { + n.nFloatBits.Store(0) + n.nInt.Store(0) +} + +// atomicN is a generic atomic number value. +type atomicN[N int64 | float64] struct { + val atomic.Uint64 +} + +func (a *atomicN[N]) Load() (value N) { + v := a.val.Load() + switch any(value).(type) { + case int64: + value = N(v) + case float64: + value = N(math.Float64frombits(v)) + default: + panic("unsupported type") + } + return value +} + +func (a *atomicN[N]) Store(v N) { + var val uint64 + switch any(v).(type) { + case int64: + val = uint64(v) + case float64: + val = math.Float64bits(float64(v)) + default: + panic("unsupported type") + } + a.val.Store(val) +} + +func (a *atomicN[N]) CompareAndSwap(oldN, newN N) bool { + var o, n uint64 + switch any(oldN).(type) { + case int64: + o, n = uint64(oldN), uint64(newN) + case float64: + o, n = math.Float64bits(float64(oldN)), math.Float64bits(float64(newN)) + default: + panic("unsupported type") + } + return a.val.CompareAndSwap(o, n) +} + +type atomicMinMax[N int64 | float64] struct { + minimum, maximum atomicN[N] + set atomic.Bool + mu sync.Mutex +} + +// init returns true if the value was used to initialize min and max. +func (s *atomicMinMax[N]) init(val N) bool { + s.mu.Lock() + defer s.mu.Unlock() + if !s.set.Load() { + defer s.set.Store(true) + s.minimum.Store(val) + s.maximum.Store(val) + return true + } + return false +} + +func (s *atomicMinMax[N]) Update(val N) { + if !s.set.Load() && s.init(val) { + return + } + + old := s.minimum.Load() + for val < old { + if s.minimum.CompareAndSwap(old, val) { + return + } + old = s.minimum.Load() + } + + old = s.maximum.Load() + for old < val { + if s.maximum.CompareAndSwap(old, val) { + return + } + old = s.maximum.Load() + } +} + // hotColdWaitGroup is a synchronization primitive which enables lockless // writes for concurrent writers and enables a reader to acquire exclusive // access to a snapshot of state including only completed operations. @@ -102,6 +193,10 @@ func (l *hotColdWaitGroup) start() uint64 { return l.startedCountAndHotIdx.Add(1) >> 63 } +func (l *hotColdWaitGroup) loadHot() uint64 { + return l.startedCountAndHotIdx.Load() >> 63 +} + // done signals to the reader that an operation has fully completed. // done is safe to call concurrently. func (l *hotColdWaitGroup) done(hotIdx uint64) { @@ -182,3 +277,58 @@ func (m *limitedSyncMap) Len() int { defer m.lenMux.Unlock() return m.len } + +// atomicLimitedRange is a range which can grow to at most maxSize. It is used +// to emulate a slice which is bounded in size, but can grow in either +// direction from any starting index. +type atomicLimitedRange struct { + startAndEnd atomic.Uint64 + maxSize int32 +} + +func (r *atomicLimitedRange) Load() (start, end int32) { + n := r.startAndEnd.Load() + return int32(n >> 32), int32(n & ((1 << 32) - 1)) +} + +func (r *atomicLimitedRange) Store(start, end int32) { + // end must be cast to a uint32 first to avoid sign extension. + r.startAndEnd.Store(uint64(start)<<32 | uint64(uint32(end))) +} + +func (r *atomicLimitedRange) Add(idx int32) bool { + for { + n := r.startAndEnd.Load() + start, end := int32(n>>32), int32(n&((1<<32)-1)) + if idx >= start && idx < end { + // no expansion needed + return true + } + + // If idx doesn't fit, still expand as far as possible in that + // direction to prevent the range from growing in the opposite + // direction. This ensures the following scale change is able to fit + // our point after the change. + partialExpansion := false + if start == end { + start = idx + end = idx + 1 + } else if idx < start { + start = idx + if end-start > r.maxSize { + start = end - r.maxSize + partialExpansion = true + } + } else if idx >= end { + end = idx + 1 + if end-start > r.maxSize { + end = start + r.maxSize + partialExpansion = true + } + } + if !r.startAndEnd.CompareAndSwap(n, uint64(start)<<32|uint64(uint32(end))) { + continue + } + return !partialExpansion + } +} diff --git a/sdk/metric/internal/aggregate/atomic_test.go b/sdk/metric/internal/aggregate/atomic_test.go index 52f053248d7..7d9d3266d7d 100644 --- a/sdk/metric/internal/aggregate/atomic_test.go +++ b/sdk/metric/internal/aggregate/atomic_test.go @@ -76,3 +76,35 @@ func TestHotColdWaitGroupConcurrentSafe(t *testing.T) { } wg.Wait() } + +func TestAtomicLimitedRange(t *testing.T) { + a := &atomicLimitedRange{maxSize: 20} + start, end := a.Load() + assert.Equal(t, int32(0), start) + assert.Equal(t, int32(0), end) + a.Store(-20, -1) + start, end = a.Load() + assert.Equal(t, int32(-20), start) + assert.Equal(t, int32(-1), end) + a.Store(0, 0) + start, end = a.Load() + assert.Equal(t, int32(0), start) + assert.Equal(t, int32(0), end) + assert.True(t, a.Add(10)) + start, end = a.Load() + assert.Equal(t, int32(10), start) + assert.Equal(t, int32(11), end) + assert.True(t, a.Add(20)) + start, end = a.Load() + assert.Equal(t, int32(10), start) + assert.Equal(t, int32(21), end) + // Exceeds maxSize by 1. + assert.False(t, a.Add(0)) + start, end = a.Load() + assert.Equal(t, int32(1), start) + assert.Equal(t, int32(21), end) + a.Store(-3, -2) + start, end = a.Load() + assert.Equal(t, int32(-3), start) + assert.Equal(t, int32(-2), end) +} diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index 5b3a19c067d..8d8cffb3730 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -6,8 +6,10 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" "errors" + "fmt" "math" "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel" @@ -18,112 +20,388 @@ import ( const ( expoMaxScale = 20 expoMinScale = -10 - - smallestNonZeroNormalFloat64 = 0x1p-1022 - - // These redefine the Math constants with a type, so the compiler won't coerce - // them into an int on 32 bit platforms. - maxInt64 int64 = math.MaxInt64 - minInt64 int64 = math.MinInt64 ) // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { + rescaleMux sync.Mutex + expoHistogramPointCounters[N] + attrs attribute.Set res FilteredExemplarReservoir[N] +} - count uint64 - min N - max N - sum N +func newExpoHistogramDataPoint[N int64 | float64]( + attrs attribute.Set, + maxSize int, + maxScale int32, +) *expoHistogramDataPoint[N] { // nolint:revive // we need this control flag + return &expoHistogramDataPoint[N]{ + attrs: attrs, + expoHistogramPointCounters: newExpoHistogramPointCounters[N](maxSize, maxScale), + } +} - maxSize int - noMinMax bool - noSum bool +// hotColdExpoHistogramPoint a hot and cold exponential histogram points, used +// in cumulative aggregations. +type hotColdExpoHistogramPoint[N int64 | float64] struct { + rescaleMux sync.Mutex + hcwg hotColdWaitGroup + hotColdPoint [2]expoHistogramPointCounters[N] - scale int32 + attrs attribute.Set + res FilteredExemplarReservoir[N] - posBuckets expoBuckets - negBuckets expoBuckets - zeroCount uint64 + maxScale int32 } -func newExpoHistogramDataPoint[N int64 | float64]( +func newHotColdExpoHistogramDataPoint[N int64 | float64]( attrs attribute.Set, maxSize int, maxScale int32, - noMinMax, noSum bool, -) *expoHistogramDataPoint[N] { // nolint:revive // we need this control flag - f := math.MaxFloat64 - ma := N(f) // if N is int64, max will overflow to -9223372036854775808 - mi := N(-f) - if N(maxInt64) > N(f) { - ma = N(maxInt64) - mi = N(minInt64) - } - return &expoHistogramDataPoint[N]{ +) *hotColdExpoHistogramPoint[N] { // nolint:revive // we need this control flag + return &hotColdExpoHistogramPoint[N]{ attrs: attrs, - min: ma, - max: mi, - maxSize: maxSize, - noMinMax: noMinMax, - noSum: noSum, - scale: maxScale, + maxScale: maxScale, + hotColdPoint: [2]expoHistogramPointCounters[N]{ + newExpoHistogramPointCounters[N](maxSize, maxScale), + newExpoHistogramPointCounters[N](maxSize, maxScale), + }, + } +} + +func (p *expoHistogramPointCounters[N]) tryFastRecord(v N, noMinMax, noSum bool) bool { // nolint:revive // we need this control flag + absV := math.Abs(float64(v)) + if float64(absV) == 0.0 { + p.zeroCount.Add(1) + return true + } + bucket := &p.posBuckets + if v < 0 { + bucket = &p.negBuckets + } + if !bucket.tryFastRecord(absV) { + return false } + if !noMinMax { + p.minMax.Update(v) + } + if !noSum { + p.sum.add(v) + } + return true } // record adds a new measurement to the histogram. It will rescale the buckets if needed. -func (p *expoHistogramDataPoint[N]) record(v N) { - p.count++ +// The caller must hold the rescaleMux lock +func (p *expoHistogramPointCounters[N]) record(v N, noMinMax, noSum bool) { // nolint:revive // we need this control flag + absV := math.Abs(float64(v)) + bucket := &p.posBuckets + if v < 0 { + bucket = &p.negBuckets + } + if !bucket.record(absV) { + // We failed to record for an unrecoverable reason. + return + } + if !noMinMax { + p.minMax.Update(v) + } + if !noSum { + p.sum.add(v) + } +} - if !p.noMinMax { - if v < p.min { - p.min = v - } - if v > p.max { - p.max = v - } +// expoHistogramPointCounters contains only the atomic counter data, and is +// used by both expoHistogramDataPoint and hotColdExpoHistogramPoint. +type expoHistogramPointCounters[N int64 | float64] struct { + minMax atomicMinMax[N] + sum atomicCounter[N] + zeroCount atomic.Uint64 + + posBuckets hotColdExpoBuckets + negBuckets hotColdExpoBuckets +} + +func newExpoHistogramPointCounters[N int64 | float64]( + maxSize int, + maxScale int32) expoHistogramPointCounters[N] { + return expoHistogramPointCounters[N]{ + posBuckets: newHotColdExpoBuckets(maxSize, maxScale), + negBuckets: newHotColdExpoBuckets(maxSize, maxScale), } - if !p.noSum { - p.sum += v +} + +// loadInto writes the values of the counters into the datapoint. +// It is safe to call concurrently, but callers need to use a hot/cold +// waitgroup to ensure consistent results. +func (e *expoHistogramPointCounters[N]) loadInto(into *metricdata.ExponentialHistogramDataPoint[N], noMinMax, noSum bool) { + into.ZeroCount = e.zeroCount.Load() + if !noSum { + into.Sum = e.sum.load() + } + if !noMinMax && e.minMax.set.Load() { + into.Min = metricdata.NewExtrema(e.minMax.minimum.Load()) + into.Max = metricdata.NewExtrema(e.minMax.maximum.Load()) } + into.Scale = e.posBuckets.unifyScale(&e.negBuckets) - absV := math.Abs(float64(v)) + posCount, posOffset := e.posBuckets.loadCountsAndOffset(&into.PositiveBucket.Counts) + into.PositiveBucket.Offset = posOffset - if float64(absV) == 0.0 { - p.zeroCount++ - return + negCount, negOffset := e.negBuckets.loadCountsAndOffset(&into.NegativeBucket.Counts) + into.NegativeBucket.Offset = negOffset + + into.Count = posCount + negCount + into.ZeroCount + +} + +// mergeInto merges this set of histogram counter data into another, +// and resets the state of this set of counters. This is used by +// hotColdHistogramPoint to ensure that the cumulative counters continue to +// accumulate after being read. +func (p *expoHistogramPointCounters[N]) mergeIntoAndReset( // nolint:revive // Intentional internal control flag + into *expoHistogramPointCounters[N], + noMinMax, noSum bool, +) { + // Swap in 0 to reset the zero count. + into.zeroCount.Add(p.zeroCount.Swap(0)) + // Do not reset min or max because cumulative min and max only ever grow + // smaller or larger respectively. + if !noMinMax && p.minMax.set.Load() { + into.minMax.Update(p.minMax.minimum.Load()) + into.minMax.Update(p.minMax.maximum.Load()) } + if !noSum { + into.sum.add(p.sum.load()) + p.sum.reset() + } + p.posBuckets.mergeIntoAndReset(&into.posBuckets) + p.negBuckets.mergeIntoAndReset(&into.negBuckets) +} - bin := p.getBin(absV) +type hotColdExpoBuckets struct { + hcwg hotColdWaitGroup + hotColdBuckets [2]expoBuckets - bucket := &p.posBuckets - if v < 0 { - bucket = &p.negBuckets + maxScale int32 +} + +func newHotColdExpoBuckets(maxSize int, maxScale int32) hotColdExpoBuckets { + return hotColdExpoBuckets{ + hotColdBuckets: [2]expoBuckets{ + newExpoBuckets(maxSize, maxScale), + newExpoBuckets(maxSize, maxScale), + }, + maxScale: maxScale, } +} - // If the new bin would make the counts larger than maxScale, we need to - // downscale current measurements. - if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 { - if p.scale-scaleDelta < expoMinScale { - // With a scale of -10 there is only two buckets for the whole range of float64 values. - // This can only happen if there is a max size of 1. - otel.Handle(errors.New("exponential histogram scale underflow")) - return - } - // Downscale - p.scale -= scaleDelta - p.posBuckets.downscale(scaleDelta) - p.negBuckets.downscale(scaleDelta) +// tryFastRecord is the fast-path for exponential histogram measurements. It +// succeeds if the value can be written without downscaling the buckets. +// If it fails, it returns false, and also expands the range of the buckets as +// far towards the required bin as possible to prevent the range from changing +// while we downscale. +func (b *hotColdExpoBuckets) tryFastRecord(v float64) bool { + hotIdx := b.hcwg.start() + defer b.hcwg.done(hotIdx) + return b.hotColdBuckets[hotIdx].recordBucket(b.hotColdBuckets[hotIdx].getBin(v)) +} - bin = p.getBin(absV) +// record is the slow path, and is invoked when the tryFastRecord fails. +// It locks to prevent concurrent scale changes. It downscales buckets to fit +// the measurement, and then records it. +func (b *hotColdExpoBuckets) record(v float64) bool { + // Hot may have been swapped while we were waiting for the lock. + // We don't use p.hcwg.start() because we already hold the lock, and would + // deadlock when waiting for writes to complete. + hotIdx := b.hcwg.loadHot() + hotBucket := &b.hotColdBuckets[hotIdx] + + // Try recording again in-case it was resized while we were waiting, and to + // ensure the bucket range doesn't change. + bin := hotBucket.getBin(v) + if hotBucket.recordBucket(hotBucket.getBin(v)) { + return true } - bucket.record(bin) + hotBucket.startEndMux.Lock() + defer hotBucket.startEndMux.Unlock() + + // Since recordBucket failed above, we know we need a scale change. + scaleDelta := hotBucket.scaleChange(bin) + if hotBucket.scale-scaleDelta < expoMinScale { + // With a scale of -10 there is only two buckets for the whole range of float64 values. + // This can only happen if there is a max size of 1. + otel.Handle(errors.New("exponential histogram scale underflow")) + return false + } + // Copy scale and min/max to cold + coldIdx := (hotIdx + 1) % 2 + coldBucket := &b.hotColdBuckets[coldIdx] + coldBucket.scale = hotBucket.scale + startBin, endBin := hotBucket.startAndEnd.Load() + coldBucket.startAndEnd.Store(startBin, endBin) + // Downscale cold to the new scale + coldBucket.downscale(scaleDelta) + // Expand the cold prior to swapping to hot to ensure our measurement fits. + bin = coldBucket.getBin(v) + coldBucket.resizeToInclude(bin) + + coldBucket.startEndMux.Lock() + defer coldBucket.startEndMux.Unlock() + + b.hcwg.swapHotAndWait() + // Now that hot has become cold, downscale it, and merge it into the new hot buckets. + hotBucket.downscale(scaleDelta) + hotBucket.mergeIntoAndReset(coldBucket, b.maxScale) + + return coldBucket.recordBucket(bin) +} + +// func (b *hotColdExpoBuckets) expandToInclude(bin int32) { +// hotIdx := b.hcwg.loadHot() +// hotBucket := &b.hotColdBuckets[hotIdx] +// // Since recordBucket failed above, we know we need a scale change. +// scaleDelta := hotBucket.scaleChange(bin) +// if hotBucket.scale-scaleDelta < expoMinScale { +// // With a scale of -10 there is only two buckets for the whole range of float64 values. +// // This can only happen if there is a max size of 1. +// otel.Handle(errors.New("exponential histogram scale underflow")) +// return +// } +// // Copy scale and min/max to cold +// coldIdx := (hotIdx + 1) % 2 +// coldBucket := &b.hotColdBuckets[coldIdx] +// coldBucket.scale = hotBucket.scale +// startBin, endBin := hotBucket.startAndEnd.Load() +// coldBucket.startAndEnd.Store(startBin, endBin) +// // Downscale cold to the new scale +// coldBucket.downscale(scaleDelta) +// // Expand the cold prior to swapping to hot to ensure our measurement fits. +// bin = coldBucket.getBin(v) +// coldBucket.resizeToInclude(bin) + +// b.hcwg.swapHotAndWait() +// // Now that hot has become cold, downscale it, and merge it into the new hot buckets. +// hotBucket.downscale(scaleDelta) +// hotBucket.mergeInto(coldBucket) +// hotBucket.reset(b.maxScale) +// } + +// mergeInto merges the values of one hotColdExpoBuckets into another. +// The caller must already have exclusive access to b, and into can accept +// measurements concurrently with mergeInto. +func (b *hotColdExpoBuckets) mergeIntoAndReset(into *hotColdExpoBuckets) { + // unifyScale is what is racing with writes + b.unifyScale(into) + bHotIdx := b.hcwg.loadHot() + bBuckets := &b.hotColdBuckets[bHotIdx] + intoHotIdx := into.hcwg.loadHot() + intoBuckets := &into.hotColdBuckets[intoHotIdx] + + startBin, endBin := bBuckets.startAndEnd.Load() + if startBin != endBin { + intoBuckets.resizeToInclude(startBin) + intoBuckets.resizeToInclude(endBin - 1) + } + scaleDelta := intoBuckets.scaleChange(endBin - 1) + if scaleDelta > 0 { + // Merging buckets required a scale change to the positive buckets to + // fit within the max scale. Update scale and scale down the negative + // buckets to match. + b.downscale(scaleDelta, bHotIdx) + into.downscale(scaleDelta, intoHotIdx) + } + b.hotColdBuckets[b.hcwg.loadHot()].mergeIntoAndReset(&into.hotColdBuckets[into.hcwg.loadHot()], b.maxScale) +} + +// unifyScale downscales buckets as needed to make the scale of b and other +// the same. It returns the resulting scale. The caller must have exclusive +// access to both hotColdExpoBuckets. +func (b *hotColdExpoBuckets) unifyScale(other *hotColdExpoBuckets) int32 { + bHotIdx := b.hcwg.loadHot() + bScale := b.hotColdBuckets[bHotIdx].scale + otherHotIdx := other.hcwg.loadHot() + otherScale := other.hotColdBuckets[otherHotIdx].scale + if bScale < otherScale { + other.downscale(otherScale-bScale, otherHotIdx) + } else if bScale > otherScale { + b.downscale(bScale-otherScale, bHotIdx) + } + return min(bScale, otherScale) +} + +// downscale force-downscales the bucket. It is assumed that the new scale is valid. +// The caller must hold the rescale mux. +func (b *hotColdExpoBuckets) downscale(delta int32, hotIdx uint64) { + // Copy scale and min/max to cold + coldIdx := (hotIdx + 1) % 2 + coldBucket := &b.hotColdBuckets[coldIdx] + hotBucket := &b.hotColdBuckets[hotIdx] + coldBucket.scale = hotBucket.scale + startBin, endBin := hotBucket.startAndEnd.Load() + + coldBucket.startAndEnd.Store(startBin, endBin) + // Downscale cold to the new scale + coldBucket.downscale(delta) + + b.hcwg.swapHotAndWait() + + // Now that hot has become cold, downscale it, and merge it into the new hot buckets. + hotBucket.downscale(delta) + hotBucket.mergeIntoAndReset(coldBucket, b.maxScale) +} + +// loadCountsAndOffset returns the buckets counts, the count, and the offset. +// It is not safe to call concurrently. +func (b *hotColdExpoBuckets) loadCountsAndOffset(buckets *[]uint64) (uint64, int32) { + return b.hotColdBuckets[b.hcwg.loadHot()].loadCountsAndOffset(buckets) +} + +// expoBuckets is a set of buckets in an exponential histogram. +type expoBuckets struct { + scale int32 + startEndMux sync.Mutex + startAndEnd atomicLimitedRange + counts []atomic.Uint64 +} + +func newExpoBuckets(maxSize int, maxScale int32) expoBuckets { + return expoBuckets{ + scale: maxScale, + counts: make([]atomic.Uint64, maxSize), + } +} + +// getIdx returns the index into counts for the provided bin. +func (e *expoBuckets) getIdx(bin int32) int { + newBin := int(bin) % len(e.counts) + return (newBin + len(e.counts)) % len(e.counts) +} + +// loadCountsAndOffset returns the buckets counts, the count, and the offset. +// It is not safe to call concurrently. +func (e *expoBuckets) loadCountsAndOffset(into *[]uint64) (uint64, int32) { + // TODO (#3047): Making copies for bounds and counts incurs a large + // memory allocation footprint. Alternatives should be explored. + start, end := e.startAndEnd.Load() + length := int(end - start) + counts := reset(*into, length, length) + count := uint64(0) + eIdx := start + for i := range length { + val := e.counts[e.getIdx(eIdx)].Load() + counts[i] = val + count += val + eIdx++ + } + *into = counts + return count, start } // getBin returns the bin v should be recorded into. -func (p *expoHistogramDataPoint[N]) getBin(v float64) int32 { +func (p *expoBuckets) getBin(v float64) int32 { frac, expInt := math.Frexp(v) // 11-bit exponential. exp := int32(expInt) // nolint: gosec @@ -168,23 +446,24 @@ var scaleFactors = [21]float64{ // scaleChange returns the magnitude of the scale change needed to fit bin in // the bucket. If no scale change is needed 0 is returned. -func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int) int32 { - if length == 0 { +func (b *expoBuckets) scaleChange(bin int32) int32 { + startBin, endBin := b.startAndEnd.Load() + if startBin == endBin { // No need to rescale if there are no buckets. return 0 } - low := int(startBin) - high := int(bin) - if startBin >= bin { - low = int(bin) - high = int(startBin) + length - 1 + lastBin := endBin - 1 + if bin < startBin { + startBin = bin + } else if bin > lastBin { + lastBin = bin } var count int32 - for high-low >= p.maxSize { - low >>= 1 - high >>= 1 + for lastBin-startBin >= int32(len(b.counts)) { + startBin >>= 1 + lastBin >>= 1 count++ if count > expoMaxScale-expoMinScale { return count @@ -193,67 +472,20 @@ func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int) return count } -// expoBuckets is a set of buckets in an exponential histogram. -type expoBuckets struct { - startBin int32 - counts []uint64 -} - -// record increments the count for the given bin, and expands the buckets if needed. -// Size changes must be done before calling this function. -func (b *expoBuckets) record(bin int32) { - if len(b.counts) == 0 { - b.counts = []uint64{1} - b.startBin = bin - return - } - - endBin := int(b.startBin) + len(b.counts) - 1 - - // if the new bin is inside the current range - if bin >= b.startBin && int(bin) <= endBin { - b.counts[bin-b.startBin]++ - return - } - // if the new bin is before the current start add spaces to the counts - if bin < b.startBin { - origLen := len(b.counts) - newLength := endBin - int(bin) + 1 - shift := b.startBin - bin - - if newLength > cap(b.counts) { - b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...) - } - - copy(b.counts[shift:origLen+int(shift)], b.counts) - b.counts = b.counts[:newLength] - for i := 1; i < int(shift); i++ { - b.counts[i] = 0 - } - b.startBin = bin - b.counts[0] = 1 - return - } - // if the new is after the end add spaces to the end - if int(bin) > endBin { - if int(bin-b.startBin) < cap(b.counts) { - b.counts = b.counts[:bin-b.startBin+1] - for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ { - b.counts[i] = 0 - } - b.counts[bin-b.startBin] = 1 - return - } - - end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1) - b.counts = append(b.counts, end...) - b.counts[bin-b.startBin] = 1 +// recordBucket returns true if the bucket was incremented, or false if a downscale is required to +func (b *expoBuckets) recordBucket(bin int32) bool { + startBin, endBin := b.startAndEnd.Load() + if bin >= startBin && bin < endBin { + b.counts[b.getIdx(bin)].Add(1) + return true } + return false } // downscale shrinks a bucket by a factor of 2*s. It will sum counts into the -// correct lower resolution bucket. +// correct lower resolution bucket. downscale is not concurrent safe. func (b *expoBuckets) downscale(delta int32) { + b.scale -= delta // Example // delta = 2 // Original offset: -6 @@ -263,68 +495,129 @@ func (b *expoBuckets) downscale(delta int32) { // new Offset: -2 // new Counts: [4, 14, 30, 10] - if len(b.counts) <= 1 || delta < 1 { - b.startBin >>= delta + startBin, endBin := b.startAndEnd.Load() + length := endBin - startBin + if length <= 1 || delta < 1 { + newStartBin := startBin >> delta + newEndBin := newStartBin + length + b.startAndEnd.Store(newStartBin, newEndBin) + // Shift all elements left by the change in start position + startShift := b.getIdx(startBin - newStartBin) + b.counts = append(b.counts[startShift:], b.counts[:startShift]...) + + // Clear all elements that are outside of our start to end range + for i := newEndBin; i < newStartBin+int32(len(b.counts)); i++ { + b.counts[b.getIdx(i)].Store(0) + } return } steps := int32(1) << delta - offset := b.startBin % steps + offset := startBin % steps offset = (offset + steps) % steps // to make offset positive - for i := 1; i < len(b.counts); i++ { - idx := i + int(offset) - if idx%int(steps) == 0 { - b.counts[idx/int(steps)] = b.counts[i] + newLen := (length-1+offset)/steps + 1 + newStartBin := startBin >> delta + newEndBin := newStartBin + newLen + startShift := b.getIdx(startBin - newStartBin) + + for i := startBin + 1; i < endBin; i++ { + newIdx := b.getIdx(int32(math.Floor(float64(i)/float64(steps))) + int32(startShift)) + if i%steps == 0 { + b.counts[newIdx].Store(b.counts[b.getIdx(i)].Load()) continue } - b.counts[idx/int(steps)] += b.counts[i] + b.counts[newIdx].Add(b.counts[b.getIdx(i)].Load()) + } + b.startAndEnd.Store(newStartBin, newEndBin) + // Shift all elements left by the change in start position + b.counts = append(b.counts[startShift:], b.counts[:startShift]...) + + // Clear all elements that are outside of our start to end range + for i := newEndBin; i < newStartBin+int32(len(b.counts)); i++ { + b.counts[b.getIdx(i)].Store(0) + } +} + +// resizeToInclude force-expands the range of b to include the bin. +// resizeToInclude is not safe to call concurrently. +func (b *expoBuckets) resizeToInclude(bin int32) { + b.startEndMux.Lock() + defer b.startEndMux.Unlock() + startBin, endBin := b.startAndEnd.Load() + if startBin == endBin { + startBin = bin + endBin = bin + 1 + b.startAndEnd.Store(startBin, endBin) + } else if bin < startBin { + b.startAndEnd.Store(bin, endBin) + } else if bin >= endBin { + b.startAndEnd.Store(startBin, bin+1) } +} - lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps) - b.counts = b.counts[:lastIdx+1] - b.startBin >>= delta +// mergeInto merges this expoBuckets into another, and resets the state +// of the expoBuckets. This is used to ensure that the cumulative counters +// continue to accumulate after being read. +// mergeInto requires that scales are equal. +func (b *expoBuckets) mergeIntoAndReset(into *expoBuckets, maxScale int32) { + if b.scale != into.scale { + panic("scales not equal") + } + intoStartBin, intoEndBin := into.startAndEnd.Load() + bStartBin, bEndBin := b.startAndEnd.Load() + b.startAndEnd.Store(0, 0) + if bStartBin != bEndBin && (intoStartBin > bStartBin || intoEndBin < bEndBin) { + panic(fmt.Sprintf("into is not a superset of b. intoStartBin %v, bStartBin %v, intoEndBin %v, bEndBin %v", intoStartBin, bStartBin, intoEndBin, bEndBin)) + } + for i := bStartBin; i < bEndBin; i++ { + // Swap in 0 to reset + val := b.counts[b.getIdx(int32(i))].Swap(0) + into.counts[into.getIdx(int32(i))].Add(val) + } + b.scale = maxScale } -// newExponentialHistogram returns an Aggregator that summarizes a set of -// measurements as an exponential histogram. Each histogram is scoped by attributes -// and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64]( +// newDeltaExponentialHistogram returns an Aggregator that summarizes a set of +// measurements as a delta exponential histogram. Each histogram is scoped by +// attributes and the aggregation cycle the measurements were made in. +func newDeltaExponentialHistogram[N int64 | float64]( maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], -) *expoHistogram[N] { - return &expoHistogram[N]{ +) *deltaExpoHistogram[N] { + return &deltaExpoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, maxSize: int(maxSize), maxScale: maxScale, newRes: r, - limit: newLimiter[expoHistogramDataPoint[N]](limit), - values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]), + hotColdValMap: [2]limitedSyncMap{ + {aggLimit: limit}, + {aggLimit: limit}, + }, start: now(), } } -// expoHistogram summarizes a set of measurements as an histogram with exponentially +// deltaExpoHistogram summarizes a set of measurements as an histogram with exponentially // defined buckets. -type expoHistogram[N int64 | float64] struct { +type deltaExpoHistogram[N int64 | float64] struct { noSum bool noMinMax bool maxSize int maxScale int32 - newRes func(attribute.Set) FilteredExemplarReservoir[N] - limit limiter[expoHistogramDataPoint[N]] - values map[attribute.Distinct]*expoHistogramDataPoint[N] - valuesMu sync.Mutex + newRes func(attribute.Set) FilteredExemplarReservoir[N] + hcwg hotColdWaitGroup + hotColdValMap [2]limitedSyncMap start time.Time } -func (e *expoHistogram[N]) measure( +func (e *deltaExpoHistogram[N]) measure( ctx context.Context, value N, fltrAttr attribute.Set, @@ -335,27 +628,22 @@ func (e *expoHistogram[N]) measure( return } - e.valuesMu.Lock() - defer e.valuesMu.Unlock() - - v, ok := e.values[fltrAttr.Equivalent()] - if !ok { - fltrAttr = e.limit.Attributes(fltrAttr, e.values) - // If we overflowed, make sure we add to the existing overflow series - // if it already exists. - v, ok = e.values[fltrAttr.Equivalent()] - if !ok { - v = newExpoHistogramDataPoint[N](fltrAttr, e.maxSize, e.maxScale, e.noMinMax, e.noSum) - v.res = e.newRes(fltrAttr) - - e.values[fltrAttr.Equivalent()] = v - } + hotIdx := e.hcwg.start() + defer e.hcwg.done(hotIdx) + v := e.hotColdValMap[hotIdx].LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + hPt := newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale) + hPt.res = e.newRes(attr) + return hPt + }).(*expoHistogramDataPoint[N]) + if !v.tryFastRecord(value, e.noMinMax, e.noSum) { + v.rescaleMux.Lock() + v.record(value, e.noMinMax, e.noSum) + v.rescaleMux.Unlock() } - v.record(value) v.res.Offer(ctx, value, droppedAttr) } -func (e *expoHistogram[N]) delta( +func (e *deltaExpoHistogram[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() @@ -365,52 +653,32 @@ func (e *expoHistogram[N]) delta( h, _ := (*dest).(metricdata.ExponentialHistogram[N]) h.Temporality = metricdata.DeltaTemporality - e.valuesMu.Lock() - defer e.valuesMu.Unlock() + // delta always clears values on collection + readIdx := e.hcwg.swapHotAndWait() - n := len(e.values) + // The len will not change while we iterate over values, since we waited + // for all writes to finish to the cold values and len. + n := e.hotColdValMap[readIdx].Len() hDPts := reset(h.DataPoints, n, n) var i int - for _, val := range e.values { + e.hotColdValMap[readIdx].Range(func(_, value any) bool { + val := value.(*expoHistogramDataPoint[N]) hDPts[i].Attributes = val.attrs hDPts[i].StartTime = e.start hDPts[i].Time = t - hDPts[i].Count = val.count - hDPts[i].Scale = val.scale - hDPts[i].ZeroCount = val.zeroCount hDPts[i].ZeroThreshold = 0.0 - hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin - hDPts[i].PositiveBucket.Counts = reset( - hDPts[i].PositiveBucket.Counts, - len(val.posBuckets.counts), - len(val.posBuckets.counts), - ) - copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts) - - hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin - hDPts[i].NegativeBucket.Counts = reset( - hDPts[i].NegativeBucket.Counts, - len(val.negBuckets.counts), - len(val.negBuckets.counts), - ) - copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts) - - if !e.noSum { - hDPts[i].Sum = val.sum - } - if !e.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) - } - + val.rescaleMux.Lock() + defer val.rescaleMux.Unlock() + val.loadInto(&hDPts[i], e.noMinMax, e.noSum) collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ - } + return true + }) // Unused attribute sets do not report. - clear(e.values) + e.hotColdValMap[readIdx].Clear() e.start = t h.DataPoints = hDPts @@ -418,7 +686,73 @@ func (e *expoHistogram[N]) delta( return n } -func (e *expoHistogram[N]) cumulative( +// newCumulativeExponentialHistogram returns an Aggregator that summarizes a +// set of measurements as a cumulative exponential histogram. Each histogram is +// scoped by attributes and the aggregation cycle the measurements were made +// in. +func newCumulativeExponentialHistogram[N int64 | float64]( + maxSize, maxScale int32, + noMinMax, noSum bool, + limit int, + r func(attribute.Set) FilteredExemplarReservoir[N], +) *cumulativeExpoHistogram[N] { + return &cumulativeExpoHistogram[N]{ + noSum: noSum, + noMinMax: noMinMax, + maxSize: int(maxSize), + maxScale: maxScale, + + newRes: r, + values: limitedSyncMap{aggLimit: limit}, + + start: now(), + } +} + +// cumulativeExpoHistogram summarizes a set of measurements as an cumulative +// histogram with exponentially defined buckets. +type cumulativeExpoHistogram[N int64 | float64] struct { + noSum bool + noMinMax bool + maxSize int + maxScale int32 + + newRes func(attribute.Set) FilteredExemplarReservoir[N] + values limitedSyncMap + + start time.Time +} + +func (e *cumulativeExpoHistogram[N]) measure( + ctx context.Context, + value N, + fltrAttr attribute.Set, + droppedAttr []attribute.KeyValue, +) { + // Ignore NaN and infinity. + if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) { + return + } + + v := e.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + hPt := newHotColdExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale) + hPt.res = e.newRes(attr) + return hPt + }).(*hotColdExpoHistogramPoint[N]) + + hotIdx := v.hcwg.start() + fastRecordSuccess := v.hotColdPoint[hotIdx].tryFastRecord(value, e.noMinMax, e.noSum) + v.hcwg.done(hotIdx) + if !fastRecordSuccess { + v.rescaleMux.Lock() + // we hold the lock, so no need to use start/end from hcwg + v.hotColdPoint[v.hcwg.loadHot()].record(value, e.noMinMax, e.noSum) + v.rescaleMux.Unlock() + } + v.res.Offer(ctx, value, droppedAttr) +} + +func (e *cumulativeExpoHistogram[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() @@ -428,56 +762,68 @@ func (e *expoHistogram[N]) cumulative( h, _ := (*dest).(metricdata.ExponentialHistogram[N]) h.Temporality = metricdata.CumulativeTemporality - e.valuesMu.Lock() - defer e.valuesMu.Unlock() - - n := len(e.values) - hDPts := reset(h.DataPoints, n, n) + // Values are being concurrently written while we iterate, so only use the + // current length for capacity. + hDPts := reset(h.DataPoints, 0, e.values.Len()) var i int - for _, val := range e.values { - hDPts[i].Attributes = val.attrs - hDPts[i].StartTime = e.start - hDPts[i].Time = t - hDPts[i].Count = val.count - hDPts[i].Scale = val.scale - hDPts[i].ZeroCount = val.zeroCount - hDPts[i].ZeroThreshold = 0.0 - - hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin - hDPts[i].PositiveBucket.Counts = reset( - hDPts[i].PositiveBucket.Counts, - len(val.posBuckets.counts), - len(val.posBuckets.counts), - ) - copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts) - - hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin - hDPts[i].NegativeBucket.Counts = reset( - hDPts[i].NegativeBucket.Counts, - len(val.negBuckets.counts), - len(val.negBuckets.counts), - ) - copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts) - - if !e.noSum { - hDPts[i].Sum = val.sum - } - if !e.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + e.values.Range(func(_, value any) bool { + val := value.(*hotColdExpoHistogramPoint[N]) + newPt := metricdata.ExponentialHistogramDataPoint[N]{ + Attributes: val.attrs, + StartTime: e.start, + Time: t, + ZeroThreshold: 0.0, } - - collectExemplars(&hDPts[i].Exemplars, val.res.Collect) + val.rescaleMux.Lock() + defer val.rescaleMux.Unlock() + // Prevent buckets from changing start or end ranges during collection + // so mergeInto below succeeds. + hotIdx := val.hcwg.loadHot() + val.hotColdPoint[hotIdx].posBuckets.hotColdBuckets[val.hotColdPoint[hotIdx].posBuckets.hcwg.loadHot()].startEndMux.Lock() + defer val.hotColdPoint[hotIdx].posBuckets.hotColdBuckets[val.hotColdPoint[hotIdx].posBuckets.hcwg.loadHot()].startEndMux.Unlock() + val.hotColdPoint[hotIdx].negBuckets.hotColdBuckets[val.hotColdPoint[hotIdx].negBuckets.hcwg.loadHot()].startEndMux.Lock() + defer val.hotColdPoint[hotIdx].negBuckets.hotColdBuckets[val.hotColdPoint[hotIdx].negBuckets.hcwg.loadHot()].startEndMux.Unlock() + + // Set the range of the cold point to the hot range to ensure we don't + // accept measurements that would have resulted in an underflow, and + // would block mergeInto below. + coldIdx := (hotIdx + 1) % 2 + start, end := val.hotColdPoint[hotIdx].posBuckets.hotColdBuckets[val.hotColdPoint[hotIdx].posBuckets.hcwg.loadHot()].startAndEnd.Load() + val.hotColdPoint[coldIdx].posBuckets.hotColdBuckets[val.hotColdPoint[coldIdx].posBuckets.hcwg.loadHot()].startAndEnd.Store(start, end) + start, end = val.hotColdPoint[hotIdx].negBuckets.hotColdBuckets[val.hotColdPoint[hotIdx].negBuckets.hcwg.loadHot()].startAndEnd.Load() + val.hotColdPoint[coldIdx].negBuckets.hotColdBuckets[val.hotColdPoint[coldIdx].negBuckets.hcwg.loadHot()].startAndEnd.Store(start, end) + + // Set the scale to the minimum of the pos and negative bucket scale. + posScale := val.hotColdPoint[hotIdx].posBuckets.hotColdBuckets[val.hotColdPoint[hotIdx].posBuckets.hcwg.loadHot()].scale + negScale := val.hotColdPoint[hotIdx].negBuckets.hotColdBuckets[val.hotColdPoint[hotIdx].negBuckets.hcwg.loadHot()].scale + newScale := min(posScale, negScale) + val.hotColdPoint[coldIdx].posBuckets.hotColdBuckets[val.hotColdPoint[coldIdx].posBuckets.hcwg.loadHot()].scale = newScale + val.hotColdPoint[coldIdx].negBuckets.hotColdBuckets[val.hotColdPoint[coldIdx].negBuckets.hcwg.loadHot()].scale = newScale + + // Swap so we can read from hot + readIdx := val.hcwg.swapHotAndWait() + readPt := &val.hotColdPoint[readIdx] + readPt.loadInto(&newPt, e.noMinMax, e.noSum) + // Once we've read the point, merge it back into the now-hot histogram + // point since it is cumulative. + readPt.mergeIntoAndReset(&val.hotColdPoint[(readIdx+1)%2], e.noMinMax, e.noSum) + + collectExemplars(&newPt.Exemplars, val.res.Collect) + hDPts = append(hDPts, newPt) i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. - } + return true + }) h.DataPoints = hDPts *dest = h - return n + return i } + +// Aquire lock first, then use HCWG for read +// Aquire lock first, then use HCWG for write diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index a27bdc73a54..13896c611d1 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "math" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -16,6 +17,10 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +const smallestNonZeroNormalFloat64 = 0x1p-1022 + +const defaultMaxSize = 20 + type noErrorHandler struct{ t *testing.T } func (h *noErrorHandler) Handle(e error) { @@ -30,6 +35,37 @@ func withHandler(t *testing.T) func() { return func() { global.SetErrorHandler(original) } } +func newTestExpoBuckets(startBin, scale, maxSize int32, counts []uint64) *expoBuckets { + e := &expoBuckets{ + scale: scale, + counts: make([]atomic.Uint64, maxSize), + startAndEnd: atomicLimitedRange{maxSize: maxSize}, + } + for i := range counts { + e.counts[e.getIdx(startBin+int32(i))].Store(counts[i]) + } + e.startAndEnd.Store(startBin, startBin+int32(len(counts))) + return e +} + +type expectedExpoBuckets struct { + startBin int32 + counts []uint64 + scale int32 +} + +func (e *expectedExpoBuckets) AssertEqualHotCold(t *testing.T, got *hotColdExpoBuckets) { + e.AssertEqual(t, &got.hotColdBuckets[got.hcwg.loadHot()]) +} + +func (e *expectedExpoBuckets) AssertEqual(t *testing.T, got *expoBuckets) { + var gotCounts []uint64 + _, startBin := got.loadCountsAndOffset(&gotCounts) + assert.Equal(t, e.startBin, startBin, "start bin") + assert.Equal(t, e.counts, gotCounts, "counts") + assert.Equal(t, e.scale, got.scale, "scale") +} + func TestExpoHistogramDataPointRecord(t *testing.T) { t.Run("float64", testExpoHistogramDataPointRecord[float64]) t.Run("float64 MinMaxSum", testExpoHistogramMinMaxSumFloat64) @@ -42,87 +78,87 @@ func testExpoHistogramDataPointRecord[N int64 | float64](t *testing.T) { testCases := []struct { maxSize int values []N - expectedBuckets expoBuckets + expectedBuckets expectedExpoBuckets expectedScale int32 }{ { maxSize: 4, values: []N{2, 4, 1}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{1, 1, 1}, + scale: 0, }, - expectedScale: 0, }, { maxSize: 4, values: []N{4, 4, 4, 2, 16, 1}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{1, 4, 1}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []N{1, 2, 4}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{1, 2}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []N{1, 4, 2}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{1, 2}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []N{2, 4, 1}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{1, 2}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []N{2, 1, 4}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{1, 2}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []N{4, 1, 2}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{1, 2}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []N{4, 2, 1}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{1, 2}, + scale: -1, }, - expectedScale: -1, }, } for _, tt := range testCases { @@ -130,15 +166,14 @@ func testExpoHistogramDataPointRecord[N int64 | float64](t *testing.T) { restore := withHandler(t) defer restore() - dp := newExpoHistogramDataPoint[N](alice, tt.maxSize, 20, false, false) + dp := newExpoHistogramDataPoint[N](alice, tt.maxSize, 20) for _, v := range tt.values { - dp.record(v) - dp.record(-v) + dp.record(v, false, false) + dp.record(-v, false, false) } - assert.Equal(t, tt.expectedBuckets, dp.posBuckets, "positive buckets") - assert.Equal(t, tt.expectedBuckets, dp.negBuckets, "negative buckets") - assert.Equal(t, tt.expectedScale, dp.scale, "scale") + tt.expectedBuckets.AssertEqualHotCold(t, &dp.posBuckets) + tt.expectedBuckets.AssertEqualHotCold(t, &dp.negBuckets) }) } } @@ -172,15 +207,20 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) { restore := withHandler(t) defer restore() - h := newExponentialHistogram[int64](4, 20, false, false, 0, dropExemplars[int64]) + h := newCumulativeExponentialHistogram[int64](4, 20, false, false, 0, dropExemplars[int64]) for _, v := range tt.values { h.measure(t.Context(), v, alice, nil) } - dp := h.values[alice.Equivalent()] + val, ok := h.values.Load(alice.Equivalent()) - assert.Equal(t, tt.expected.max, dp.max) - assert.Equal(t, tt.expected.min, dp.min) - assert.InDelta(t, tt.expected.sum, dp.sum, 0.01) + assert.True(t, ok) + dp := val.(*hotColdExpoHistogramPoint[int64]) + readIdx := dp.hcwg.swapHotAndWait() + + assert.True(t, dp.hotColdPoint[readIdx].minMax.set.Load()) + assert.Equal(t, tt.expected.max, dp.hotColdPoint[readIdx].minMax.maximum.Load()) + assert.Equal(t, tt.expected.min, dp.hotColdPoint[readIdx].minMax.minimum.Load()) + assert.InDelta(t, tt.expected.sum, dp.hotColdPoint[readIdx].sum.load(), 0.01) }) } } @@ -214,15 +254,20 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) { restore := withHandler(t) defer restore() - h := newExponentialHistogram[float64](4, 20, false, false, 0, dropExemplars[float64]) + h := newDeltaExponentialHistogram[float64](4, 20, false, false, 0, dropExemplars[float64]) for _, v := range tt.values { h.measure(t.Context(), v, alice, nil) } - dp := h.values[alice.Equivalent()] + readIdx := h.hcwg.swapHotAndWait() + val, ok := h.hotColdValMap[readIdx].Load(alice.Equivalent()) + + assert.True(t, ok) + dp := val.(*expoHistogramDataPoint[float64]) - assert.Equal(t, tt.expected.max, dp.max) - assert.Equal(t, tt.expected.min, dp.min) - assert.InDelta(t, tt.expected.sum, dp.sum, 0.01) + assert.True(t, dp.minMax.set.Load()) + assert.Equal(t, tt.expected.max, dp.minMax.maximum.Load()) + assert.Equal(t, tt.expected.min, dp.minMax.minimum.Load()) + assert.InDelta(t, tt.expected.sum, dp.sum.load(), 0.01) }) } } @@ -231,7 +276,7 @@ func testExpoHistogramDataPointRecordFloat64(t *testing.T) { type TestCase struct { maxSize int values []float64 - expectedBuckets expoBuckets + expectedBuckets expectedExpoBuckets expectedScale int32 } @@ -239,65 +284,65 @@ func testExpoHistogramDataPointRecordFloat64(t *testing.T) { { maxSize: 4, values: []float64{2, 2, 2, 1, 8, 0.5}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{2, 3, 1}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []float64{1, 0.5, 2}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{2, 1}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []float64{1, 2, 0.5}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{2, 1}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []float64{2, 0.5, 1}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{2, 1}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []float64{2, 1, 0.5}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{2, 1}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []float64{0.5, 1, 2}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{2, 1}, + scale: -1, }, - expectedScale: -1, }, { maxSize: 2, values: []float64{0.5, 2, 1}, - expectedBuckets: expoBuckets{ + expectedBuckets: expectedExpoBuckets{ startBin: -1, counts: []uint64{2, 1}, + scale: -1, }, - expectedScale: -1, }, } for _, tt := range testCases { @@ -305,15 +350,15 @@ func testExpoHistogramDataPointRecordFloat64(t *testing.T) { restore := withHandler(t) defer restore() - dp := newExpoHistogramDataPoint[float64](alice, tt.maxSize, 20, false, false) + dp := newExpoHistogramDataPoint[float64](alice, tt.maxSize, 20) for _, v := range tt.values { - dp.record(v) - dp.record(-v) + dp.record(v, false, false) + dp.record(-v, false, false) } - assert.Equal(t, tt.expectedBuckets, dp.posBuckets) - assert.Equal(t, tt.expectedBuckets, dp.negBuckets) - assert.Equal(t, tt.expectedScale, dp.scale) + dp.posBuckets.unifyScale(&dp.negBuckets) + tt.expectedBuckets.AssertEqualHotCold(t, &dp.posBuckets) + tt.expectedBuckets.AssertEqualHotCold(t, &dp.negBuckets) }) } } @@ -322,171 +367,156 @@ func TestExponentialHistogramDataPointRecordLimits(t *testing.T) { // These bins are calculated from the following formula: // floor( log2( value) * 2^20 ) using an arbitrary precision calculator. - fdp := newExpoHistogramDataPoint[float64](alice, 4, 20, false, false) - fdp.record(math.MaxFloat64) + fdp := newExpoHistogramDataPoint[float64](alice, 4, 20) + fdp.record(math.MaxFloat64, false, false) - if fdp.posBuckets.startBin != 1073741823 { - t.Errorf("Expected startBin to be 1073741823, got %d", fdp.posBuckets.startBin) + readIdx := fdp.posBuckets.hcwg.loadHot() + startBin, _ := fdp.posBuckets.hotColdBuckets[readIdx].startAndEnd.Load() + if startBin != 1073741823 { + t.Errorf("Expected startBin to be 1073741823, got %d", startBin) } - fdp = newExpoHistogramDataPoint[float64](alice, 4, 20, false, false) - fdp.record(math.SmallestNonzeroFloat64) + fdp = newExpoHistogramDataPoint[float64](alice, 4, 20) + fdp.record(math.SmallestNonzeroFloat64, false, false) - if fdp.posBuckets.startBin != -1126170625 { - t.Errorf("Expected startBin to be -1126170625, got %d", fdp.posBuckets.startBin) + readIdx = fdp.posBuckets.hcwg.loadHot() + startBin, _ = fdp.posBuckets.hotColdBuckets[readIdx].startAndEnd.Load() + if startBin != -1126170625 { + t.Errorf("Expected startBin to be -1126170625, got %d", startBin) } - idp := newExpoHistogramDataPoint[int64](alice, 4, 20, false, false) - idp.record(math.MaxInt64) + idp := newExpoHistogramDataPoint[int64](alice, 4, 20) + idp.record(math.MaxInt64, false, false) - if idp.posBuckets.startBin != 66060287 { - t.Errorf("Expected startBin to be 66060287, got %d", idp.posBuckets.startBin) + readIdx = idp.posBuckets.hcwg.loadHot() + startBin, _ = idp.posBuckets.hotColdBuckets[readIdx].startAndEnd.Load() + if startBin != 66060287 { + t.Errorf("Expected startBin to be 66060287, got %d", startBin) } } -func TestExpoBucketDownscale(t *testing.T) { +func TestExpoBucketDownscale(t *testing.T) { // TODO FIX!!!!!!!!!!! tests := []struct { name string bucket *expoBuckets scale int32 - want *expoBuckets + want *expectedExpoBuckets }{ { name: "Empty bucket", - bucket: &expoBuckets{}, + bucket: newTestExpoBuckets(0, 0, defaultMaxSize, nil), scale: 3, - want: &expoBuckets{}, + want: &expectedExpoBuckets{ + scale: -3, + }, }, { - name: "1 size bucket", - bucket: &expoBuckets{ - startBin: 50, - counts: []uint64{7}, - }, - scale: 4, - want: &expoBuckets{ + name: "1 size bucket", + bucket: newTestExpoBuckets(50, 0, defaultMaxSize, []uint64{7}), + scale: 4, + want: &expectedExpoBuckets{ startBin: 3, counts: []uint64{7}, + scale: -4, }, }, { - name: "zero scale", - bucket: &expoBuckets{ - startBin: 50, - counts: []uint64{7, 5}, - }, - scale: 0, - want: &expoBuckets{ + name: "zero scale", + bucket: newTestExpoBuckets(50, 0, defaultMaxSize, []uint64{7, 5}), + scale: 0, + want: &expectedExpoBuckets{ startBin: 50, counts: []uint64{7, 5}, }, }, { - name: "aligned bucket scale 1", - bucket: &expoBuckets{ - startBin: 0, - counts: []uint64{1, 2, 3, 4, 5, 6}, - }, - scale: 1, - want: &expoBuckets{ + name: "aligned bucket scale 1", + bucket: newTestExpoBuckets(0, 0, defaultMaxSize, []uint64{1, 2, 3, 4, 5, 6}), + scale: 1, + want: &expectedExpoBuckets{ startBin: 0, counts: []uint64{3, 7, 11}, + scale: -1, }, }, { - name: "aligned bucket scale 2", - bucket: &expoBuckets{ - startBin: 0, - counts: []uint64{1, 2, 3, 4, 5, 6}, - }, - scale: 2, - want: &expoBuckets{ + name: "aligned bucket scale 2", + bucket: newTestExpoBuckets(0, 0, defaultMaxSize, []uint64{1, 2, 3, 4, 5, 6}), + scale: 2, + want: &expectedExpoBuckets{ startBin: 0, counts: []uint64{10, 11}, + scale: -2, }, }, { - name: "aligned bucket scale 3", - bucket: &expoBuckets{ - startBin: 0, - counts: []uint64{1, 2, 3, 4, 5, 6}, - }, - scale: 3, - want: &expoBuckets{ + name: "aligned bucket scale 3", + bucket: newTestExpoBuckets(0, 0, defaultMaxSize, []uint64{1, 2, 3, 4, 5, 6}), + scale: 3, + want: &expectedExpoBuckets{ startBin: 0, counts: []uint64{21}, + scale: -3, }, }, { - name: "unaligned bucket scale 1", - bucket: &expoBuckets{ - startBin: 5, - counts: []uint64{1, 2, 3, 4, 5, 6}, - }, // This is equivalent to [0,0,0,0,0,1,2,3,4,5,6] - scale: 1, - want: &expoBuckets{ + name: "unaligned bucket scale 1 A", + bucket: newTestExpoBuckets(5, 0, defaultMaxSize, []uint64{1, 2, 3, 4, 5, 6}), // This is equivalent to [0,0,0,0,0,1,2,3,4,5,6] + scale: 1, + want: &expectedExpoBuckets{ startBin: 2, counts: []uint64{1, 5, 9, 6}, + scale: -1, }, // This is equivalent to [0,0,1,5,9,6] }, { - name: "unaligned bucket scale 2", - bucket: &expoBuckets{ - startBin: 7, - counts: []uint64{1, 2, 3, 4, 5, 6}, - }, // This is equivalent to [0,0,0,0,0,0,0,1,2,3,4,5,6] - scale: 2, - want: &expoBuckets{ + name: "unaligned bucket scale 2", + bucket: newTestExpoBuckets(7, 0, defaultMaxSize, []uint64{1, 2, 3, 4, 5, 6}), // This is equivalent to [0,0,0,0,0,0,0,1,2,3,4,5,6] + scale: 2, + want: &expectedExpoBuckets{ startBin: 1, counts: []uint64{1, 14, 6}, + scale: -2, }, // This is equivalent to [0,1,14,6] }, { - name: "unaligned bucket scale 3", - bucket: &expoBuckets{ - startBin: 3, - counts: []uint64{1, 2, 3, 4, 5, 6}, - }, // This is equivalent to [0,0,0,1,2,3,4,5,6] - scale: 3, - want: &expoBuckets{ + name: "unaligned bucket scale 3", + bucket: newTestExpoBuckets(3, 0, defaultMaxSize, []uint64{1, 2, 3, 4, 5, 6}), // This is equivalent to [0,0,0,1,2,3,4,5,6] + scale: 3, + want: &expectedExpoBuckets{ startBin: 0, counts: []uint64{15, 6}, + scale: -3, }, // This is equivalent to [0,15,6] }, { - name: "unaligned bucket scale 1", - bucket: &expoBuckets{ + name: "unaligned bucket scale 1 B", + bucket: newTestExpoBuckets(2, 0, defaultMaxSize, []uint64{1, 0, 1}), + scale: 1, + want: &expectedExpoBuckets{ startBin: 1, - counts: []uint64{1, 0, 1}, - }, - scale: 1, - want: &expoBuckets{ - startBin: 0, counts: []uint64{1, 1}, + scale: -1, }, }, { - name: "negative startBin", - bucket: &expoBuckets{ - startBin: -1, - counts: []uint64{1, 0, 3}, - }, - scale: 1, - want: &expoBuckets{ + name: "negative startBin", + bucket: newTestExpoBuckets(-1, 0, defaultMaxSize, []uint64{1, 0, 3}), + scale: 1, + want: &expectedExpoBuckets{ startBin: -1, counts: []uint64{1, 3}, + scale: -1, }, }, { - name: "negative startBin 2", - bucket: &expoBuckets{ - startBin: -4, - counts: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - }, - scale: 1, - want: &expoBuckets{ + name: "negative startBin 2", + bucket: newTestExpoBuckets(-4, 0, defaultMaxSize, []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + scale: 1, + want: &expectedExpoBuckets{ startBin: -2, counts: []uint64{3, 7, 11, 15, 19}, + scale: -1, }, }, } @@ -494,7 +524,7 @@ func TestExpoBucketDownscale(t *testing.T) { t.Run(tt.name, func(t *testing.T) { tt.bucket.downscale(tt.scale) - assert.Equal(t, tt.want, tt.bucket) + tt.want.AssertEqual(t, tt.bucket) }) } } @@ -504,49 +534,40 @@ func TestExpoBucketRecord(t *testing.T) { name string bucket *expoBuckets bin int32 - want *expoBuckets + want *expectedExpoBuckets }{ { name: "Empty Bucket creates first count", - bucket: &expoBuckets{}, + bucket: newTestExpoBuckets(0, 0, defaultMaxSize, nil), bin: -5, - want: &expoBuckets{ + want: &expectedExpoBuckets{ startBin: -5, counts: []uint64{1}, }, }, { - name: "Bin is in the bucket", - bucket: &expoBuckets{ - startBin: 3, - counts: []uint64{1, 2, 3, 4, 5, 6}, - }, - bin: 5, - want: &expoBuckets{ + name: "Bin is in the bucket", + bucket: newTestExpoBuckets(3, 0, defaultMaxSize, []uint64{1, 2, 3, 4, 5, 6}), + bin: 5, + want: &expectedExpoBuckets{ startBin: 3, counts: []uint64{1, 2, 4, 4, 5, 6}, }, }, { - name: "Bin is before the start of the bucket", - bucket: &expoBuckets{ - startBin: 1, - counts: []uint64{1, 2, 3, 4, 5, 6}, - }, - bin: -2, - want: &expoBuckets{ + name: "Bin is before the start of the bucket", + bucket: newTestExpoBuckets(1, 0, defaultMaxSize, []uint64{1, 2, 3, 4, 5, 6}), + bin: -2, + want: &expectedExpoBuckets{ startBin: -2, counts: []uint64{1, 0, 0, 1, 2, 3, 4, 5, 6}, }, }, { - name: "Bin is after the end of the bucket", - bucket: &expoBuckets{ - startBin: -2, - counts: []uint64{1, 2, 3, 4, 5, 6}, - }, - bin: 4, - want: &expoBuckets{ + name: "Bin is after the end of the bucket", + bucket: newTestExpoBuckets(-2, 0, defaultMaxSize, []uint64{1, 2, 3, 4, 5, 6}), + bin: 4, + want: &expectedExpoBuckets{ startBin: -2, counts: []uint64{1, 2, 3, 4, 5, 6, 1}, }, @@ -554,95 +575,66 @@ func TestExpoBucketRecord(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.bucket.record(tt.bin) + tt.bucket.resizeToInclude(tt.bin) + tt.bucket.recordBucket(tt.bin) - assert.Equal(t, tt.want, tt.bucket) + tt.want.AssertEqual(t, tt.bucket) }) } } func TestScaleChange(t *testing.T) { - type args struct { - bin int32 - startBin int32 - length int - maxSize int - } tests := []struct { - name string - args args - want int32 + name string + bin int32 + bucket *expoBuckets + want int32 }{ { name: "if length is 0, no rescale is needed", // [] -> [5] Length 1 - args: args{ - bin: 5, - startBin: 0, - length: 0, - maxSize: 4, - }, - want: 0, + bin: 5, + bucket: newTestExpoBuckets(0, 0, 4, nil), + want: 0, }, { name: "if bin is between start, and the end, no rescale needed", // [-1, ..., 8] Length 10 -> [-1, ..., 5, ..., 8] Length 10 - args: args{ - bin: 5, - startBin: -1, - length: 10, - maxSize: 20, - }, - want: 0, + bin: 5, + bucket: newTestExpoBuckets(-1, 0, defaultMaxSize, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), + want: 0, }, { name: "if len([bin,... end]) > maxSize, rescale needed", // [8,9,10] Length 3 -> [5, ..., 10] Length 6 - args: args{ - bin: 5, - startBin: 8, - length: 3, - maxSize: 5, - }, - want: 1, + bin: 5, + bucket: newTestExpoBuckets(8, 0, 5, []uint64{0, 1, 2}), + want: 1, }, { name: "if len([start, ..., bin]) > maxSize, rescale needed", // [2,3,4] Length 3 -> [2, ..., 7] Length 6 - args: args{ - bin: 7, - startBin: 2, - length: 3, - maxSize: 5, - }, - want: 1, + bin: 7, + bucket: newTestExpoBuckets(2, 0, 5, []uint64{0, 1, 2}), + want: 1, }, { name: "if len([start, ..., bin]) > maxSize, rescale needed", // [2,3,4] Length 3 -> [2, ..., 7] Length 12 - args: args{ - bin: 13, - startBin: 2, - length: 3, - maxSize: 5, - }, - want: 2, + bin: 13, + bucket: newTestExpoBuckets(2, 0, 5, []uint64{0, 1, 2}), + want: 2, }, { - name: "It should not hang if it will never be able to rescale", - args: args{ - bin: 1, - startBin: -1, - length: 1, - maxSize: 1, - }, - want: 31, + name: "It should not hang if it will never be able to rescale", + bin: 1, + bucket: newTestExpoBuckets(-1, 0, 1, []uint64{0}), + want: 31, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := newExpoHistogramDataPoint[float64](alice, tt.args.maxSize, 20, false, false) - got := p.scaleChange(tt.args.bin, tt.args.startBin, tt.args.length) + got := tt.bucket.scaleChange(tt.bin) if got != tt.want { t.Errorf("scaleChange() = %v, want %v", got, tt.want) } @@ -652,10 +644,10 @@ func TestScaleChange(t *testing.T) { func BenchmarkPrepend(b *testing.B) { for i := 0; i < b.N; i++ { - agg := newExpoHistogramDataPoint[float64](alice, 1024, 20, false, false) + agg := newExpoHistogramDataPoint[float64](alice, 1024, 20) n := math.MaxFloat64 for range 1024 { - agg.record(n) + agg.record(n, false, false) n /= 2 } } @@ -663,10 +655,10 @@ func BenchmarkPrepend(b *testing.B) { func BenchmarkAppend(b *testing.B) { for i := 0; i < b.N; i++ { - agg := newExpoHistogramDataPoint[float64](alice, 1024, 20, false, false) + agg := newExpoHistogramDataPoint[float64](alice, 1024, 20) n := smallestNonZeroNormalFloat64 for range 1024 { - agg.record(n) + agg.record(n, false, false) n *= 2 } } @@ -703,27 +695,22 @@ func BenchmarkExponentialHistogram(b *testing.B) { } func TestSubNormal(t *testing.T) { - want := &expoHistogramDataPoint[float64]{ - attrs: alice, - maxSize: 4, - count: 3, - min: math.SmallestNonzeroFloat64, - max: math.SmallestNonzeroFloat64, - sum: 3 * math.SmallestNonzeroFloat64, - - scale: 20, - posBuckets: expoBuckets{ - startBin: -1126170625, - counts: []uint64{3}, - }, - } - - ehdp := newExpoHistogramDataPoint[float64](alice, 4, 20, false, false) - ehdp.record(math.SmallestNonzeroFloat64) - ehdp.record(math.SmallestNonzeroFloat64) - ehdp.record(math.SmallestNonzeroFloat64) - assert.Equal(t, want, ehdp) + ehdp := newExpoHistogramDataPoint[float64](alice, 4, 20) + ehdp.record(math.SmallestNonzeroFloat64, false, false) + ehdp.record(math.SmallestNonzeroFloat64, false, false) + ehdp.record(math.SmallestNonzeroFloat64, false, false) + + assert.True(t, ehdp.minMax.set.Load()) + assert.Equal(t, math.SmallestNonzeroFloat64, ehdp.minMax.maximum.Load()) + assert.Equal(t, math.SmallestNonzeroFloat64, ehdp.minMax.minimum.Load()) + assert.Equal(t, 3*math.SmallestNonzeroFloat64, ehdp.sum.load()) + expected := &expectedExpoBuckets{ + startBin: -1126170625, + counts: []uint64{3}, + scale: 20, + } + expected.AssertEqualHotCold(t, &ehdp.posBuckets) } func TestExponentialHistogramAggregation(t *testing.T) { @@ -1131,26 +1118,28 @@ func FuzzGetBin(f *testing.F) { t.Skip("skipping test for zero") } - p := newExpoHistogramDataPoint[float64](alice, 4, 20, false, false) + b := expoBuckets{ + scale: 20, + } // scale range is -10 to 20. - p.scale = (scale%31+31)%31 - 10 - got := p.getBin(v) - if v <= lowerBound(got, p.scale) { + b.scale = (scale%31+31)%31 - 10 + got := b.getBin(v) + if v <= lowerBound(got, b.scale) { t.Errorf( "v=%x scale =%d had bin %d, but was below lower bound %x", v, - p.scale, + b.scale, got, - lowerBound(got, p.scale), + lowerBound(got, b.scale), ) } - if v > lowerBound(got+1, p.scale) { + if v > lowerBound(got+1, b.scale) { t.Errorf( "v=%x scale =%d had bin %d, but was above upper bound %x", v, - p.scale, + b.scale, got, - lowerBound(got+1, p.scale), + lowerBound(got+1, b.scale), ) } }) @@ -1163,3 +1152,23 @@ func lowerBound(index, scale int32) float64 { // 2 ^ (index * 2 ^ (-scale)) return math.Exp2(math.Ldexp(float64(index), -int(scale))) } + +// func TestExpoHistogramPointCountersConcurrentSafe(t *testing.T) { +// c1 := newExpoHistogramPointCounters[float64](20, 20) +// c2 := newExpoHistogramPointCounters[float64](20, 20) +// for i := range 10 { +// c1.record(float64(i), false, false) +// } +// var wg2 sync.WaitGroup +// for i := range 100 { +// wg2.Add(1) +// go func() { +// c2.record(float64(i), false, false) +// wg2.Done() +// }() +// } +// got := metricdata.ExponentialHistogramDataPoint[float64]{} +// c1.loadInto(&got, false, false) +// c1.mergeIntoAndReset(&c2, false, false) +// wg2.Wait() +// } diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index a094519cf6d..b436cebe70e 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -7,151 +7,148 @@ import ( "context" "slices" "sort" - "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -type buckets[N int64 | float64] struct { +type histogramPoint[N int64 | float64] struct { attrs attribute.Set res FilteredExemplarReservoir[N] - - counts []uint64 - count uint64 - total N - min, max N + histogramPointCounters[N] } -// newBuckets returns buckets with n bins. -func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] { - return &buckets[N]{attrs: attrs, counts: make([]uint64, n)} +// histogramPointCounters contains only the atomic counter data, and is used by +// both histogramPoint and hotColdHistogramPoint. +type histogramPointCounters[N int64 | float64] struct { + counts []atomic.Uint64 + total atomicCounter[N] + minMax atomicMinMax[N] } -func (b *buckets[N]) sum(value N) { b.total += value } +func (b *histogramPointCounters[N]) sum(value N) { b.total.add(value) } -func (b *buckets[N]) bin(idx int) { - b.counts[idx]++ - b.count++ +func (b *histogramPointCounters[N]) bin(bounds []float64, value N) { + // This search will return an index in the range [0, len(s.bounds)], where + // it will return len(s.bounds) if value is greater than the last element + // of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint + // is len(s.bounds)+1, with the last bucket representing: + // (s.bounds[len(s.bounds)-1], +∞). + idx := sort.SearchFloat64s(bounds, float64(value)) + b.counts[idx].Add(1) } -func (b *buckets[N]) minMax(value N) { - if value < b.min { - b.min = value - } else if value > b.max { - b.max = value +func (b *histogramPointCounters[N]) loadCounts() ([]uint64, uint64) { + // TODO (#3047): Making copies for bounds and counts incurs a large + // memory allocation footprint. Alternatives should be explored. + counts := make([]uint64, len(b.counts)) + count := uint64(0) + for i := range counts { + c := b.counts[i].Load() + counts[i] = c + count += c + } + return counts, count +} + +// mergeIntoAndReset merges this set of histogram counter data into another, +// and resets the state of this set of counters. This is used by +// hotColdHistogramPoint to ensure that the cumulative counters continue to +// accumulate after being read. +func (b *histogramPointCounters[N]) mergeIntoAndReset( // nolint:revive // Intentional internal control flag + into *histogramPointCounters[N], + noMinMax, noSum bool, +) { + for i := range b.counts { + into.counts[i].Add(b.counts[i].Load()) + b.counts[i].Store(0) + } + + if !noMinMax { + // Do not reset min or max because cumulative min and max only ever grow + // smaller or larger respectively. + + if b.minMax.set.Load() { + into.minMax.Update(b.minMax.minimum.Load()) + into.minMax.Update(b.minMax.maximum.Load()) + } + } + if !noSum { + into.total.add(b.total.load()) + b.total.reset() } } -// histValues summarizes a set of measurements as an histValues with -// explicitly defined buckets. -type histValues[N int64 | float64] struct { +// deltaHistogram is a histogram whose internal storage is reset when it is +// collected. +type deltaHistogram[N int64 | float64] struct { + hcwg hotColdWaitGroup + hotColdValMap [2]limitedSyncMap + + start time.Time noMinMax bool noSum bool bounds []float64 - newRes func(attribute.Set) FilteredExemplarReservoir[N] - limit limiter[buckets[N]] - values map[attribute.Distinct]*buckets[N] - valuesMu sync.Mutex } -func newHistValues[N int64 | float64]( - bounds []float64, - noMinMax, noSum bool, - limit int, - r func(attribute.Set) FilteredExemplarReservoir[N], -) *histValues[N] { - // The responsibility of keeping all buckets correctly associated with the - // passed boundaries is ultimately this type's responsibility. Make a copy - // here so we can always guarantee this. Or, in the case of failure, have - // complete control over the fix. - b := slices.Clone(bounds) - slices.Sort(b) - return &histValues[N]{ - noMinMax: noMinMax, - noSum: noSum, - bounds: b, - newRes: r, - limit: newLimiter[buckets[N]](limit), - values: make(map[attribute.Distinct]*buckets[N]), - } -} - -// Aggregate records the measurement value, scoped by attr, and aggregates it -// into a histogram. -func (s *histValues[N]) measure( +func (s *deltaHistogram[N]) measure( ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue, ) { - // This search will return an index in the range [0, len(s.bounds)], where - // it will return len(s.bounds) if value is greater than the last element - // of s.bounds. This aligns with the buckets in that the length of buckets - // is len(s.bounds)+1, with the last bucket representing: - // (s.bounds[len(s.bounds)-1], +∞). - idx := sort.SearchFloat64s(s.bounds, float64(value)) - - s.valuesMu.Lock() - defer s.valuesMu.Unlock() - - b, ok := s.values[fltrAttr.Equivalent()] - if !ok { - fltrAttr = s.limit.Attributes(fltrAttr, s.values) - // If we overflowed, make sure we add to the existing overflow series - // if it already exists. - b, ok = s.values[fltrAttr.Equivalent()] - if !ok { - // N+1 buckets. For example: - // - // bounds = [0, 5, 10] - // - // Then, - // - // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) - b = newBuckets[N](fltrAttr, len(s.bounds)+1) - b.res = s.newRes(fltrAttr) - - // Ensure min and max are recorded values (not zero), for new buckets. - b.min, b.max = value, value - s.values[fltrAttr.Equivalent()] = b + hotIdx := s.hcwg.start() + defer s.hcwg.done(hotIdx) + h := s.hotColdValMap[hotIdx].LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + hPt := &histogramPoint[N]{ + res: s.newRes(attr), + attrs: attr, + histogramPointCounters: histogramPointCounters[N]{counts: make([]atomic.Uint64, len(s.bounds)+1)}, } - } - b.bin(idx) + return hPt + }).(*histogramPoint[N]) + + h.bin(s.bounds, value) if !s.noMinMax { - b.minMax(value) + h.minMax.Update(value) } if !s.noSum { - b.sum(value) + h.sum(value) } - b.res.Offer(ctx, value, droppedAttr) + h.res.Offer(ctx, value, droppedAttr) } -// newHistogram returns an Aggregator that summarizes a set of measurements as -// an histogram. -func newHistogram[N int64 | float64]( +// newDeltaHistogram returns a histogram that is reset each time it is +// collected. +func newDeltaHistogram[N int64 | float64]( boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], -) *histogram[N] { - return &histogram[N]{ - histValues: newHistValues[N](boundaries, noMinMax, noSum, limit, r), - start: now(), +) *deltaHistogram[N] { + // The responsibility of keeping all histogramPoint correctly associated with the + // passed boundaries is ultimately this type's responsibility. Make a copy + // here so we can always guarantee this. Or, in the case of failure, have + // complete control over the fix. + b := slices.Clone(boundaries) + slices.Sort(b) + return &deltaHistogram[N]{ + start: now(), + noMinMax: noMinMax, + noSum: noSum, + bounds: b, + newRes: r, + hotColdValMap: [2]limitedSyncMap{ + {aggLimit: limit}, + {aggLimit: limit}, + }, } } -// histogram summarizes a set of measurements as an histogram with explicitly -// defined buckets. -type histogram[N int64 | float64] struct { - *histValues[N] - - start time.Time -} - -func (s *histogram[N]) delta( +func (s *deltaHistogram[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() @@ -161,39 +158,46 @@ func (s *histogram[N]) delta( h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.DeltaTemporality - s.valuesMu.Lock() - defer s.valuesMu.Unlock() + // delta always clears values on collection + readIdx := s.hcwg.swapHotAndWait() // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) - n := len(s.values) + // The len will not change while we iterate over values, since we waited + // for all writes to finish to the cold values and len. + n := s.hotColdValMap[readIdx].Len() hDPts := reset(h.DataPoints, n, n) var i int - for _, val := range s.values { + s.hotColdValMap[readIdx].Range(func(_, value any) bool { + val := value.(*histogramPoint[N]) + bucketCounts, count := val.loadCounts() hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t - hDPts[i].Count = val.count + hDPts[i].Count = count hDPts[i].Bounds = bounds - hDPts[i].BucketCounts = val.counts + hDPts[i].BucketCounts = bucketCounts if !s.noSum { - hDPts[i].Sum = val.total + hDPts[i].Sum = val.total.load() } if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + if val.minMax.set.Load() { + hDPts[i].Min = metricdata.NewExtrema(val.minMax.minimum.Load()) + hDPts[i].Max = metricdata.NewExtrema(val.minMax.maximum.Load()) + } } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ - } + return true + }) // Unused attribute sets do not report. - clear(s.values) + s.hotColdValMap[readIdx].Clear() // The delta collection cycle resets. s.start = t @@ -203,7 +207,86 @@ func (s *histogram[N]) delta( return n } -func (s *histogram[N]) cumulative( +// cumulativeHistogram summarizes a set of measurements as an histogram with explicitly +// defined histogramPoint. +type cumulativeHistogram[N int64 | float64] struct { + values limitedSyncMap + + start time.Time + noMinMax bool + noSum bool + bounds []float64 + newRes func(attribute.Set) FilteredExemplarReservoir[N] +} + +// newCumulativeHistogram returns a histogram that accumulates measurements +// into a histogram data structure. It is never reset. +func newCumulativeHistogram[N int64 | float64]( + boundaries []float64, + noMinMax, noSum bool, + limit int, + r func(attribute.Set) FilteredExemplarReservoir[N], +) *cumulativeHistogram[N] { + // The responsibility of keeping all histogramPoint correctly associated with the + // passed boundaries is ultimately this type's responsibility. Make a copy + // here so we can always guarantee this. Or, in the case of failure, have + // complete control over the fix. + b := slices.Clone(boundaries) + slices.Sort(b) + return &cumulativeHistogram[N]{ + start: now(), + noMinMax: noMinMax, + noSum: noSum, + bounds: b, + newRes: r, + values: limitedSyncMap{aggLimit: limit}, + } +} + +type hotColdHistogramPoint[N int64 | float64] struct { + hcwg hotColdWaitGroup + hotColdPoint [2]histogramPointCounters[N] + + attrs attribute.Set + res FilteredExemplarReservoir[N] +} + +func (s *cumulativeHistogram[N]) measure( + ctx context.Context, + value N, + fltrAttr attribute.Set, + droppedAttr []attribute.KeyValue, +) { + h := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + hPt := &hotColdHistogramPoint[N]{ + res: s.newRes(attr), + attrs: attr, + hotColdPoint: [2]histogramPointCounters[N]{ + { + counts: make([]atomic.Uint64, len(s.bounds)+1), + }, + { + counts: make([]atomic.Uint64, len(s.bounds)+1), + }, + }, + } + return hPt + }).(*hotColdHistogramPoint[N]) + + hotIdx := h.hcwg.start() + defer h.hcwg.done(hotIdx) + + h.hotColdPoint[hotIdx].bin(s.bounds, value) + if !s.noMinMax { + h.hotColdPoint[hotIdx].minMax.Update(value) + } + if !s.noSum { + h.hotColdPoint[hotIdx].sum(value) + } + h.res.Offer(ctx, value, droppedAttr) +} + +func (s *cumulativeHistogram[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() @@ -213,50 +296,57 @@ func (s *histogram[N]) cumulative( h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.CumulativeTemporality - s.valuesMu.Lock() - defer s.valuesMu.Unlock() - // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) - n := len(s.values) - hDPts := reset(h.DataPoints, n, n) + // Values are being concurrently written while we iterate, so only use the + // current length for capacity. + hDPts := reset(h.DataPoints, 0, s.values.Len()) var i int - for _, val := range s.values { - hDPts[i].Attributes = val.attrs - hDPts[i].StartTime = s.start - hDPts[i].Time = t - hDPts[i].Count = val.count - hDPts[i].Bounds = bounds - - // The HistogramDataPoint field values returned need to be copies of - // the buckets value as we will keep updating them. - // - // TODO (#3047): Making copies for bounds and counts incurs a large - // memory allocation footprint. Alternatives should be explored. - hDPts[i].BucketCounts = slices.Clone(val.counts) + s.values.Range(func(_, value any) bool { + val := value.(*hotColdHistogramPoint[N]) + // swap, observe, and clear the point + readIdx := val.hcwg.swapHotAndWait() + bucketCounts, count := val.hotColdPoint[readIdx].loadCounts() + newPt := metricdata.HistogramDataPoint[N]{ + Attributes: val.attrs, + StartTime: s.start, + Time: t, + Count: count, + Bounds: bounds, + // The HistogramDataPoint field values returned need to be copies of + // the histogramPoint value as we will keep updating them. + BucketCounts: bucketCounts, + } if !s.noSum { - hDPts[i].Sum = val.total + newPt.Sum = val.hotColdPoint[readIdx].total.load() } - if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + if val.hotColdPoint[readIdx].minMax.set.Load() { + newPt.Min = metricdata.NewExtrema(val.hotColdPoint[readIdx].minMax.minimum.Load()) + newPt.Max = metricdata.NewExtrema(val.hotColdPoint[readIdx].minMax.maximum.Load()) + } } + // Once we've read the point, merge it back into the hot histogram + // point since it is cumulative. + hotIdx := (readIdx + 1) % 2 + val.hotColdPoint[readIdx].mergeIntoAndReset(&val.hotColdPoint[hotIdx], s.noMinMax, s.noSum) - collectExemplars(&hDPts[i].Exemplars, val.res.Collect) + collectExemplars(&newPt.Exemplars, val.res.Collect) + hDPts = append(hDPts, newPt) i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. - } + return true + }) h.DataPoints = hDPts *dest = h - return n + return i } diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 6e0f3948de0..8f12e311794 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -6,6 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" "sort" + "sync/atomic" "testing" "time" @@ -22,6 +23,14 @@ var ( noMinMax = false ) +func newHistogramPoint[N int64 | float64](attrs attribute.Set, n int) *histogramPoint[N] { + hPt := &histogramPoint[N]{ + attrs: attrs, + histogramPointCounters: histogramPointCounters[N]{counts: make([]atomic.Uint64, n)}, + } + return hPt +} + func TestHistogram(t *testing.T) { c := new(clock) t.Cleanup(c.Register()) @@ -337,21 +346,29 @@ func TestBucketsBin(t *testing.T) { func testBucketsBin[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { - b := newBuckets[N](alice, 3) - assertB := func(counts []uint64, count uint64, mi, ma N) { + b := newHistogramPoint[N](alice, 3) + assertB := func(expectedBucketCounts []uint64, expectedCount uint64, mi, ma N) { t.Helper() - assert.Equal(t, counts, b.counts) - assert.Equal(t, count, b.count) - assert.Equal(t, mi, b.min) - assert.Equal(t, ma, b.max) + bucketCounts, count := b.loadCounts() + assert.Equal(t, expectedBucketCounts, bucketCounts) + assert.Equal(t, expectedCount, count) + if mi != 0 { + assert.True(t, b.minMax.set.Load()) + assert.Equal(t, mi, b.minMax.minimum.Load()) + } + if ma != 0 { + assert.True(t, b.minMax.set.Load()) + assert.Equal(t, ma, b.minMax.maximum.Load()) + } } + bounds := []float64{0, 2, 4} assertB([]uint64{0, 0, 0}, 0, 0, 0) - b.bin(1) - b.minMax(2) + b.bin(bounds, 1) + b.minMax.Update(2) assertB([]uint64{0, 1, 0}, 1, 0, 2) - b.bin(0) - b.minMax(-1) + b.bin(bounds, -1) + b.minMax.Update(-1) assertB([]uint64{1, 1, 0}, 2, -1, 2) } } @@ -363,18 +380,18 @@ func TestBucketsSum(t *testing.T) { func testBucketsSum[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { - b := newBuckets[N](alice, 3) + b := newHistogramPoint[N](alice, 3) var want N - assert.Equal(t, want, b.total) + assert.Equal(t, want, b.total.load()) b.sum(2) want = 2 - assert.Equal(t, want, b.total) + assert.Equal(t, want, b.total.load()) b.sum(-1) want = 1 - assert.Equal(t, want, b.total) + assert.Equal(t, want, b.total.load()) } } @@ -383,7 +400,7 @@ func TestHistogramImmutableBounds(t *testing.T) { cpB := make([]float64, len(b)) copy(cpB, b) - h := newHistogram[int64](b, false, false, 0, dropExemplars[int64]) + h := newCumulativeHistogram[int64](b, false, false, 0, dropExemplars[int64]) require.Equal(t, cpB, h.bounds) b[0] = 10 @@ -392,29 +409,41 @@ func TestHistogramImmutableBounds(t *testing.T) { h.measure(t.Context(), 5, alice, nil) var data metricdata.Aggregation = metricdata.Histogram[int64]{} - h.cumulative(&data) + h.collect(&data) hdp := data.(metricdata.Histogram[int64]).DataPoints[0] hdp.Bounds[1] = 10 assert.Equal(t, cpB, h.bounds, "modifying the Aggregation bounds should not change the bounds") } func TestCumulativeHistogramImmutableCounts(t *testing.T) { - h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) + h := newCumulativeHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) h.measure(t.Context(), 5, alice, nil) var data metricdata.Aggregation = metricdata.Histogram[int64]{} - h.cumulative(&data) + h.collect(&data) hdp := data.(metricdata.Histogram[int64]).DataPoints[0] - require.Equal(t, hdp.BucketCounts, h.values[alice.Equivalent()].counts) + hPt, ok := h.values.Load(alice.Equivalent()) + require.True(t, ok) + hcHistPt := hPt.(*hotColdHistogramPoint[int64]) + readIdx := hcHistPt.hcwg.swapHotAndWait() + bucketCounts, _ := hcHistPt.hotColdPoint[readIdx].loadCounts() + require.Equal(t, hdp.BucketCounts, bucketCounts) + hotIdx := (readIdx + 1) % 2 + hcHistPt.hotColdPoint[readIdx].mergeIntoAndReset(&hcHistPt.hotColdPoint[hotIdx], noMinMax, false) cpCounts := make([]uint64, len(hdp.BucketCounts)) copy(cpCounts, hdp.BucketCounts) hdp.BucketCounts[0] = 10 + hPt, ok = h.values.Load(alice.Equivalent()) + require.True(t, ok) + hcHistPt = hPt.(*hotColdHistogramPoint[int64]) + readIdx = hcHistPt.hcwg.swapHotAndWait() + bucketCounts, _ = hcHistPt.hotColdPoint[readIdx].loadCounts() assert.Equal( t, cpCounts, - h.values[alice.Equivalent()].counts, + bucketCounts, "modifying the Aggregator bucket counts should not change the Aggregator", ) } @@ -424,28 +453,28 @@ func TestDeltaHistogramReset(t *testing.T) { now = func() time.Time { return y2k } t.Cleanup(func() { now = orig }) - h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) + h := newDeltaHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) var data metricdata.Aggregation = metricdata.Histogram[int64]{} - require.Equal(t, 0, h.delta(&data)) + require.Equal(t, 0, h.collect(&data)) require.Empty(t, data.(metricdata.Histogram[int64]).DataPoints) h.measure(t.Context(), 1, alice, nil) expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality} expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1, now(), now())} - h.delta(&data) + h.collect(&data) metricdatatest.AssertAggregationsEqual(t, expect, data) // The attr set should be forgotten once Aggregations is called. expect.DataPoints = nil - assert.Equal(t, 0, h.delta(&data)) + assert.Equal(t, 0, h.collect(&data)) assert.Empty(t, data.(metricdata.Histogram[int64]).DataPoints) // Aggregating another set should not affect the original (alice). h.measure(t.Context(), 1, bob, nil) expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1, now(), now())} - h.delta(&data) + h.collect(&data) metricdatatest.AssertAggregationsEqual(t, expect, data) } diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 3e2ed741505..79f3c8f938f 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -5,117 +5,160 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" - "sync" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -// datapoint is timestamped measurement data. -type datapoint[N int64 | float64] struct { +// lastValuePoint is timestamped measurement data. +type lastValuePoint[N int64 | float64] struct { attrs attribute.Set - value N + value atomicN[N] res FilteredExemplarReservoir[N] } -func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] { - return &lastValue[N]{ +// lastValue summarizes a set of measurements as the last one made. +type lastValueMap[N int64 | float64] struct { + newRes func(attribute.Set) FilteredExemplarReservoir[N] + values limitedSyncMap +} + +func (s *lastValueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { + lv := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + return &lastValuePoint[N]{ + res: s.newRes(attr), + attrs: attr, + } + }).(*lastValuePoint[N]) + + lv.value.Store(value) + lv.res.Offer(ctx, value, droppedAttr) +} + +func newDeltaLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *deltaLastValue[N] { + return &deltaLastValue[N]{ newRes: r, - limit: newLimiter[datapoint[N]](limit), - values: make(map[attribute.Distinct]*datapoint[N]), start: now(), + hotColdValMap: [2]lastValueMap[N]{ + { + values: limitedSyncMap{aggLimit: limit}, + newRes: r, + }, + { + values: limitedSyncMap{aggLimit: limit}, + newRes: r, + }, + }, } } -// lastValue summarizes a set of measurements as the last one made. -type lastValue[N int64 | float64] struct { - sync.Mutex - +// deltaLastValue summarizes a set of measurements as the last one made. TODO: update +type deltaLastValue[N int64 | float64] struct { newRes func(attribute.Set) FilteredExemplarReservoir[N] - limit limiter[datapoint[N]] - values map[attribute.Distinct]*datapoint[N] start time.Time -} - -func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { - s.Lock() - defer s.Unlock() - - d, ok := s.values[fltrAttr.Equivalent()] - if !ok { - fltrAttr = s.limit.Attributes(fltrAttr, s.values) - d = &datapoint[N]{ - res: s.newRes(fltrAttr), - attrs: fltrAttr, - } - } - d.value = value - d.res.Offer(ctx, value, droppedAttr) + hcwg hotColdWaitGroup + hotColdValMap [2]lastValueMap[N] +} - s.values[fltrAttr.Equivalent()] = d +func (s *deltaLastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { + hotIdx := s.hcwg.start() + defer s.hcwg.done(hotIdx) + s.hotColdValMap[hotIdx].measure(ctx, value, fltrAttr, droppedAttr) } -func (s *lastValue[N]) delta( +func (s *deltaLastValue[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() + n := s.copyAndClearDpts(dest, t) + // Update start time for delta temporality. + s.start = t + return n +} + +// copyAndClearDpts copies the lastValuePoints held by s into dest. The number of lastValuePoints +// copied is returned. +func (s *deltaLastValue[N]) copyAndClearDpts(dest *metricdata.Aggregation, t time.Time) int { // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of - // the DataPoints is missed (better luck next time). + // the lastValuePoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) + // delta always clears values on collection + readIdx := s.hcwg.swapHotAndWait() + // The len will not change while we iterate over values, since we waited + // for all writes to finish to the cold values and len. + n := s.hotColdValMap[readIdx].values.Len() + dPts := reset(gData.DataPoints, n, n) - s.Lock() - defer s.Unlock() - - n := s.copyDpts(&gData.DataPoints, t) + var i int + s.hotColdValMap[readIdx].values.Range(func(key, value any) bool { + v := value.(*lastValuePoint[N]) + dPts[i].Attributes = v.attrs + dPts[i].StartTime = s.start + dPts[i].Time = t + dPts[i].Value = v.value.Load() + collectExemplars[N](&dPts[i].Exemplars, v.res.Collect) + i++ + return true + }) + gData.DataPoints = dPts // Do not report stale values. - clear(s.values) - // Update start time for delta temporality. - s.start = t - + s.hotColdValMap[readIdx].values.Clear() *dest = gData - return n } -func (s *lastValue[N]) cumulative( +// cumulativeLastValue summarizes a set of measurements as the last one made. TODO: update +type cumulativeLastValue[N int64 | float64] struct { + lastValueMap[N] + start time.Time +} + +func newCumulativeLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *cumulativeLastValue[N] { + return &cumulativeLastValue[N]{ + lastValueMap: lastValueMap[N]{ + values: limitedSyncMap{aggLimit: limit}, + newRes: r, + }, + start: now(), + } +} + +func (s *cumulativeLastValue[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of - // the DataPoints is missed (better luck next time). + // the lastValuePoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) - s.Lock() - defer s.Unlock() + // Values are being concurrently written while we iterate, so only use the + // current length for capacity. + dPts := reset(gData.DataPoints, 0, s.values.Len()) - n := s.copyDpts(&gData.DataPoints, t) + var i int + s.values.Range(func(key, value any) bool { + v := value.(*lastValuePoint[N]) + newPt := metricdata.DataPoint[N]{ + Attributes: v.attrs, + StartTime: s.start, + Time: t, + Value: v.value.Load(), + } + collectExemplars[N](&newPt.Exemplars, v.res.Collect) + dPts = append(dPts, newPt) + i++ + return true + }) + gData.DataPoints = dPts // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. *dest = gData - return n -} - -// copyDpts copies the datapoints held by s into dest. The number of datapoints -// copied is returned. -func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int { - n := len(s.values) - *dest = reset(*dest, n, n) - - var i int - for _, v := range s.values { - (*dest)[i].Attributes = v.attrs - (*dest)[i].StartTime = s.start - (*dest)[i].Time = t - (*dest)[i].Value = v.value - collectExemplars(&(*dest)[i].Exemplars, v.res.Collect) - i++ - } - return n + return i } // newPrecomputedLastValue returns an aggregator that summarizes a set of @@ -124,51 +167,23 @@ func newPrecomputedLastValue[N int64 | float64]( limit int, r func(attribute.Set) FilteredExemplarReservoir[N], ) *precomputedLastValue[N] { - return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} + return &precomputedLastValue[N]{deltaLastValue: newDeltaLastValue[N](limit, r)} } // precomputedLastValue summarizes a set of observations as the last one made. type precomputedLastValue[N int64 | float64] struct { - *lastValue[N] + *deltaLastValue[N] } func (s *precomputedLastValue[N]) delta( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { - t := now() - // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of - // the DataPoints is missed (better luck next time). - gData, _ := (*dest).(metricdata.Gauge[N]) - - s.Lock() - defer s.Unlock() - - n := s.copyDpts(&gData.DataPoints, t) - // Do not report stale values. - clear(s.values) - // Update start time for delta temporality. - s.start = t - - *dest = gData - - return n + return s.collect(dest) } func (s *precomputedLastValue[N]) cumulative( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { - t := now() - // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of - // the DataPoints is missed (better luck next time). - gData, _ := (*dest).(metricdata.Gauge[N]) - - s.Lock() - defer s.Unlock() - - n := s.copyDpts(&gData.DataPoints, t) - // Do not report stale values. - clear(s.values) - *dest = gData - - return n + // Do not reset the start time. + return s.copyAndClearDpts(dest, now()) } diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 81690855114..66cb68085fd 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -17,12 +17,12 @@ type sumValue[N int64 | float64] struct { attrs attribute.Set } -type valueMap[N int64 | float64] struct { +type sumValueMap[N int64 | float64] struct { values limitedSyncMap newRes func(attribute.Set) FilteredExemplarReservoir[N] } -func (s *valueMap[N]) measure( +func (s *sumValueMap[N]) measure( ctx context.Context, value N, fltrAttr attribute.Set, @@ -52,7 +52,7 @@ func newDeltaSum[N int64 | float64]( return &deltaSum[N]{ monotonic: monotonic, start: now(), - hotColdValMap: [2]valueMap[N]{ + hotColdValMap: [2]sumValueMap[N]{ { values: limitedSyncMap{aggLimit: limit}, newRes: r, @@ -71,7 +71,7 @@ type deltaSum[N int64 | float64] struct { start time.Time hcwg hotColdWaitGroup - hotColdValMap [2]valueMap[N] + hotColdValMap [2]sumValueMap[N] } func (s *deltaSum[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { @@ -130,7 +130,7 @@ func newCumulativeSum[N int64 | float64]( return &cumulativeSum[N]{ monotonic: monotonic, start: now(), - valueMap: valueMap[N]{ + sumValueMap: sumValueMap[N]{ values: limitedSyncMap{aggLimit: limit}, newRes: r, }, @@ -142,7 +142,7 @@ type cumulativeSum[N int64 | float64] struct { monotonic bool start time.Time - valueMap[N] + sumValueMap[N] } func (s *cumulativeSum[N]) collect(