diff --git a/CHANGELOG.md b/CHANGELOG.md index d0393e1a4ad..50070f1e8ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,7 +56,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Include W3C TraceFlags (bits 0–7) in the OTLP `Span.Flags` field in `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracehttp` and `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracegrpc`. (#7438) - The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types. If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442) -- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427) +- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427, #7474) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 2b60410801b..afaefb63f21 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -126,12 +126,13 @@ func (b Builder[N]) ExplicitBucketHistogram( boundaries []float64, noMinMax, noSum bool, ) (Measure[N], ComputeAggregation) { - h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: - return b.filter(h.measure), h.delta + h := newDeltaHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) + return b.filter(h.measure), h.collect default: - return b.filter(h.measure), h.cumulative + h := newCumulativeHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) + return b.filter(h.measure), h.collect } } diff --git a/sdk/metric/internal/aggregate/atomic.go b/sdk/metric/internal/aggregate/atomic.go index 0fa6d3c6fa8..eb69e965079 100644 --- a/sdk/metric/internal/aggregate/atomic.go +++ b/sdk/metric/internal/aggregate/atomic.go @@ -51,6 +51,97 @@ func (n *atomicCounter[N]) add(value N) { } } +// reset resets the internal state, and is not safe to call concurrently. +func (n *atomicCounter[N]) reset() { + n.nFloatBits.Store(0) + n.nInt.Store(0) +} + +// atomicN is a generic atomic number value. +type atomicN[N int64 | float64] struct { + val atomic.Uint64 +} + +func (a *atomicN[N]) Load() (value N) { + v := a.val.Load() + switch any(value).(type) { + case int64: + value = N(v) + case float64: + value = N(math.Float64frombits(v)) + default: + panic("unsupported type") + } + return value +} + +func (a *atomicN[N]) Store(v N) { + var val uint64 + switch any(v).(type) { + case int64: + val = uint64(v) + case float64: + val = math.Float64bits(float64(v)) + default: + panic("unsupported type") + } + a.val.Store(val) +} + +func (a *atomicN[N]) CompareAndSwap(oldN, newN N) bool { + var o, n uint64 + switch any(oldN).(type) { + case int64: + o, n = uint64(oldN), uint64(newN) + case float64: + o, n = math.Float64bits(float64(oldN)), math.Float64bits(float64(newN)) + default: + panic("unsupported type") + } + return a.val.CompareAndSwap(o, n) +} + +type atomicMinMax[N int64 | float64] struct { + minimum, maximum atomicN[N] + set atomic.Bool + mu sync.Mutex +} + +// init returns true if the value was used to initialize min and max. +func (s *atomicMinMax[N]) init(val N) bool { + s.mu.Lock() + defer s.mu.Unlock() + if !s.set.Load() { + defer s.set.Store(true) + s.minimum.Store(val) + s.maximum.Store(val) + return true + } + return false +} + +func (s *atomicMinMax[N]) Update(val N) { + if !s.set.Load() && s.init(val) { + return + } + + old := s.minimum.Load() + for val < old { + if s.minimum.CompareAndSwap(old, val) { + return + } + old = s.minimum.Load() + } + + old = s.maximum.Load() + for old < val { + if s.maximum.CompareAndSwap(old, val) { + return + } + old = s.maximum.Load() + } +} + // hotColdWaitGroup is a synchronization primitive which enables lockless // writes for concurrent writers and enables a reader to acquire exclusive // access to a snapshot of state including only completed operations. diff --git a/sdk/metric/internal/aggregate/atomic_test.go b/sdk/metric/internal/aggregate/atomic_test.go index 52f053248d7..174ef764f21 100644 --- a/sdk/metric/internal/aggregate/atomic_test.go +++ b/sdk/metric/internal/aggregate/atomic_test.go @@ -52,6 +52,33 @@ func TestAtomicSumAddIntConcurrentSafe(t *testing.T) { assert.Equal(t, int64(15), aSum.load()) } +func BenchmarkAtomicCounter(b *testing.B) { + b.Run("Int64", benchmarkAtomicCounter[int64]) + b.Run("Float64", benchmarkAtomicCounter[float64]) +} + +func benchmarkAtomicCounter[N int64 | float64](b *testing.B) { + b.Run("add", func(b *testing.B) { + var a atomicCounter[N] + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + a.add(2) + } + }) + }) + b.Run("load", func(b *testing.B) { + var a atomicCounter[N] + a.add(2) + var v N + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + v = a.load() + } + }) + assert.Equal(b, N(2), v) + }) +} + func TestHotColdWaitGroupConcurrentSafe(t *testing.T) { var wg sync.WaitGroup hcwg := &hotColdWaitGroup{} @@ -76,3 +103,150 @@ func TestHotColdWaitGroupConcurrentSafe(t *testing.T) { } wg.Wait() } + +func TestAtomicN(t *testing.T) { + t.Run("Int64", testAtomicN[int64]) + t.Run("Float64", testAtomicN[float64]) +} + +func testAtomicN[N int64 | float64](t *testing.T) { + var v atomicN[N] + assert.Equal(t, N(0), v.Load()) + assert.True(t, v.CompareAndSwap(0, 6)) + assert.Equal(t, N(6), v.Load()) + assert.False(t, v.CompareAndSwap(0, 6)) + v.Store(22) + assert.Equal(t, N(22), v.Load()) +} + +func TestAtomicNConcurrentSafe(t *testing.T) { + t.Run("Int64", testAtomicNConcurrentSafe[int64]) + t.Run("Float64", testAtomicNConcurrentSafe[float64]) +} + +func testAtomicNConcurrentSafe[N int64 | float64](t *testing.T) { + var wg sync.WaitGroup + var v atomicN[N] + + for range 2 { + wg.Add(1) + go func() { + defer wg.Done() + got := v.Load() + assert.Equal(t, int64(0), int64(got)%6) + }() + wg.Add(1) + go func() { + defer wg.Done() + v.Store(12) + }() + wg.Add(1) + go func() { + defer wg.Done() + v.CompareAndSwap(0, 6) + }() + } + wg.Wait() +} + +func BenchmarkAtomicN(b *testing.B) { + b.Run("Int64", benchmarkAtomicN[int64]) + b.Run("Float64", benchmarkAtomicN[float64]) +} + +func benchmarkAtomicN[N int64 | float64](b *testing.B) { + b.Run("Load", func(b *testing.B) { + var a atomicN[N] + a.Store(2) + var v N + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + v = a.Load() + } + }) + assert.Equal(b, N(2), v) + }) + b.Run("Store", func(b *testing.B) { + var a atomicN[N] + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + a.Store(3) + } + }) + }) + b.Run("CompareAndSwap", func(b *testing.B) { + var a atomicN[N] + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + // Make sure we swap back and forth, in-case that matters. + if i%2 == 0 { + a.CompareAndSwap(0, 1) + } else { + a.CompareAndSwap(1, 0) + } + i++ + } + }) + }) +} + +func TestAtomicMinMaxConcurrentSafe(t *testing.T) { + t.Run("Int64", testAtomicMinMaxConcurrentSafe[int64]) + t.Run("Float64", testAtomicMinMaxConcurrentSafe[float64]) +} + +func testAtomicMinMaxConcurrentSafe[N int64 | float64](t *testing.T) { + var wg sync.WaitGroup + var minMax atomicMinMax[N] + + assert.False(t, minMax.set.Load()) + for _, i := range []float64{2, 4, 6, 8, -3, 0, 8, 0} { + wg.Add(1) + go func() { + defer wg.Done() + minMax.Update(N(i)) + }() + } + wg.Wait() + + assert.True(t, minMax.set.Load()) + assert.Equal(t, N(-3), minMax.minimum.Load()) + assert.Equal(t, N(8), minMax.maximum.Load()) +} + +func BenchmarkAtomicMinMax(b *testing.B) { + b.Run("Int64", benchmarkAtomicMinMax[int64]) + b.Run("Float64", benchmarkAtomicMinMax[float64]) +} + +func benchmarkAtomicMinMax[N int64 | float64](b *testing.B) { + b.Run("UpdateIncreasing", func(b *testing.B) { + var a atomicMinMax[N] + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + a.Update(N(i)) + i++ + } + }) + }) + b.Run("UpdateDecreasing", func(b *testing.B) { + var a atomicMinMax[N] + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + a.Update(N(i)) + i-- + } + }) + }) + b.Run("UpdateConstant", func(b *testing.B) { + var a atomicMinMax[N] + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + a.Update(N(5)) + } + }) + }) +} diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index a094519cf6d..421325fb728 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -7,151 +7,169 @@ import ( "context" "slices" "sort" - "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -type buckets[N int64 | float64] struct { +// histogramPoint is a single histogram point, used in delta aggregations. +type histogramPoint[N int64 | float64] struct { attrs attribute.Set res FilteredExemplarReservoir[N] - - counts []uint64 - count uint64 - total N - min, max N + histogramPointCounters[N] } -// newBuckets returns buckets with n bins. -func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] { - return &buckets[N]{attrs: attrs, counts: make([]uint64, n)} +// hotColdHistogramPoint a hot and cold histogram points, used in cumulative +// aggregations. +type hotColdHistogramPoint[N int64 | float64] struct { + hcwg hotColdWaitGroup + hotColdPoint [2]histogramPointCounters[N] + + attrs attribute.Set + res FilteredExemplarReservoir[N] } -func (b *buckets[N]) sum(value N) { b.total += value } +// histogramPointCounters contains only the atomic counter data, and is used by +// both histogramPoint and hotColdHistogramPoint. +type histogramPointCounters[N int64 | float64] struct { + counts []atomic.Uint64 + total atomicCounter[N] + minMax atomicMinMax[N] +} -func (b *buckets[N]) bin(idx int) { - b.counts[idx]++ - b.count++ +func (b *histogramPointCounters[N]) loadCountsInto(into *[]uint64) uint64 { + // TODO (#3047): Making copies for counts incurs a large + // memory allocation footprint. Alternatives should be explored. + counts := reset(*into, len(b.counts), len(b.counts)) + count := uint64(0) + for i := range b.counts { + c := b.counts[i].Load() + counts[i] = c + count += c + } + *into = counts + return count } -func (b *buckets[N]) minMax(value N) { - if value < b.min { - b.min = value - } else if value > b.max { - b.max = value +// mergeIntoAndReset merges this set of histogram counter data into another, +// and resets the state of this set of counters. This is used by +// hotColdHistogramPoint to ensure that the cumulative counters continue to +// accumulate after being read. +func (b *histogramPointCounters[N]) mergeIntoAndReset( // nolint:revive // Intentional internal control flag + into *histogramPointCounters[N], + noMinMax, noSum bool, +) { + for i := range b.counts { + into.counts[i].Add(b.counts[i].Load()) + b.counts[i].Store(0) + } + + if !noMinMax { + // Do not reset min or max because cumulative min and max only ever grow + // smaller or larger respectively. + + if b.minMax.set.Load() { + into.minMax.Update(b.minMax.minimum.Load()) + into.minMax.Update(b.minMax.maximum.Load()) + } + } + if !noSum { + into.total.add(b.total.load()) + b.total.reset() } } -// histValues summarizes a set of measurements as an histValues with -// explicitly defined buckets. -type histValues[N int64 | float64] struct { +// deltaHistogram is a histogram whose internal storage is reset when it is +// collected. +// +// deltaHistogram's measure is implemented without locking, even when called +// concurrently with collect. This is done by maintaining two separate maps: +// one "hot" which is concurrently updated by measure(), and one "cold", which +// is read and reset by collect(). The [hotcoldWaitGroup] allows collect() to +// swap the hot and cold maps, and wait for updates to the cold map to complete +// prior to reading. deltaHistogram swaps ald clears complete maps so that +// unused attribute sets do not report in subsequent collect() calls. +type deltaHistogram[N int64 | float64] struct { + hcwg hotColdWaitGroup + hotColdValMap [2]limitedSyncMap + + start time.Time noMinMax bool noSum bool bounds []float64 - newRes func(attribute.Set) FilteredExemplarReservoir[N] - limit limiter[buckets[N]] - values map[attribute.Distinct]*buckets[N] - valuesMu sync.Mutex } -func newHistValues[N int64 | float64]( - bounds []float64, - noMinMax, noSum bool, - limit int, - r func(attribute.Set) FilteredExemplarReservoir[N], -) *histValues[N] { - // The responsibility of keeping all buckets correctly associated with the - // passed boundaries is ultimately this type's responsibility. Make a copy - // here so we can always guarantee this. Or, in the case of failure, have - // complete control over the fix. - b := slices.Clone(bounds) - slices.Sort(b) - return &histValues[N]{ - noMinMax: noMinMax, - noSum: noSum, - bounds: b, - newRes: r, - limit: newLimiter[buckets[N]](limit), - values: make(map[attribute.Distinct]*buckets[N]), - } -} - -// Aggregate records the measurement value, scoped by attr, and aggregates it -// into a histogram. -func (s *histValues[N]) measure( +func (s *deltaHistogram[N]) measure( ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue, ) { - // This search will return an index in the range [0, len(s.bounds)], where - // it will return len(s.bounds) if value is greater than the last element - // of s.bounds. This aligns with the buckets in that the length of buckets - // is len(s.bounds)+1, with the last bucket representing: - // (s.bounds[len(s.bounds)-1], +∞). - idx := sort.SearchFloat64s(s.bounds, float64(value)) - - s.valuesMu.Lock() - defer s.valuesMu.Unlock() - - b, ok := s.values[fltrAttr.Equivalent()] - if !ok { - fltrAttr = s.limit.Attributes(fltrAttr, s.values) - // If we overflowed, make sure we add to the existing overflow series - // if it already exists. - b, ok = s.values[fltrAttr.Equivalent()] - if !ok { + hotIdx := s.hcwg.start() + defer s.hcwg.done(hotIdx) + h := s.hotColdValMap[hotIdx].LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + hPt := &histogramPoint[N]{ + res: s.newRes(attr), + attrs: attr, // N+1 buckets. For example: // // bounds = [0, 5, 10] // // Then, // - // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) - b = newBuckets[N](fltrAttr, len(s.bounds)+1) - b.res = s.newRes(fltrAttr) - - // Ensure min and max are recorded values (not zero), for new buckets. - b.min, b.max = value, value - s.values[fltrAttr.Equivalent()] = b + // counts = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) + histogramPointCounters: histogramPointCounters[N]{counts: make([]atomic.Uint64, len(s.bounds)+1)}, } - } - b.bin(idx) + return hPt + }).(*histogramPoint[N]) + + // This search will return an index in the range [0, len(s.bounds)], where + // it will return len(s.bounds) if value is greater than the last element + // of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint + // is len(s.bounds)+1, with the last bucket representing: + // (s.bounds[len(s.bounds)-1], +∞). + idx := sort.SearchFloat64s(s.bounds, float64(value)) + h.counts[idx].Add(1) if !s.noMinMax { - b.minMax(value) + h.minMax.Update(value) } if !s.noSum { - b.sum(value) + h.total.add(value) } - b.res.Offer(ctx, value, droppedAttr) + h.res.Offer(ctx, value, droppedAttr) } -// newHistogram returns an Aggregator that summarizes a set of measurements as -// an histogram. -func newHistogram[N int64 | float64]( +// newDeltaHistogram returns a histogram that is reset each time it is +// collected. +func newDeltaHistogram[N int64 | float64]( boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], -) *histogram[N] { - return &histogram[N]{ - histValues: newHistValues[N](boundaries, noMinMax, noSum, limit, r), - start: now(), +) *deltaHistogram[N] { + // The responsibility of keeping all histogramPoint correctly associated with the + // passed boundaries is ultimately this type's responsibility. Make a copy + // here so we can always guarantee this. Or, in the case of failure, have + // complete control over the fix. + b := slices.Clone(boundaries) + slices.Sort(b) + return &deltaHistogram[N]{ + start: now(), + noMinMax: noMinMax, + noSum: noSum, + bounds: b, + newRes: r, + hotColdValMap: [2]limitedSyncMap{ + {aggLimit: limit}, + {aggLimit: limit}, + }, } } -// histogram summarizes a set of measurements as an histogram with explicitly -// defined buckets. -type histogram[N int64 | float64] struct { - *histValues[N] - - start time.Time -} - -func (s *histogram[N]) delta( +func (s *deltaHistogram[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() @@ -161,39 +179,46 @@ func (s *histogram[N]) delta( h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.DeltaTemporality - s.valuesMu.Lock() - defer s.valuesMu.Unlock() + // delta always clears values on collection + readIdx := s.hcwg.swapHotAndWait() // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) - n := len(s.values) + // The len will not change while we iterate over values, since we waited + // for all writes to finish to the cold values and len. + n := s.hotColdValMap[readIdx].Len() hDPts := reset(h.DataPoints, n, n) var i int - for _, val := range s.values { + s.hotColdValMap[readIdx].Range(func(_, value any) bool { + val := value.(*histogramPoint[N]) + + count := val.loadCountsInto(&hDPts[i].BucketCounts) hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t - hDPts[i].Count = val.count + hDPts[i].Count = count hDPts[i].Bounds = bounds - hDPts[i].BucketCounts = val.counts if !s.noSum { - hDPts[i].Sum = val.total + hDPts[i].Sum = val.total.load() } if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + if val.minMax.set.Load() { + hDPts[i].Min = metricdata.NewExtrema(val.minMax.minimum.Load()) + hDPts[i].Max = metricdata.NewExtrema(val.minMax.maximum.Load()) + } } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ - } + return true + }) // Unused attribute sets do not report. - clear(s.values) + s.hotColdValMap[readIdx].Clear() // The delta collection cycle resets. s.start = t @@ -203,7 +228,101 @@ func (s *histogram[N]) delta( return n } -func (s *histogram[N]) cumulative( +// cumulativeHistogram summarizes a set of measurements as an histogram with explicitly +// defined histogramPoint. +// +// cumulativeHistogram's measure is implemented without locking, even when +// called concurrently with collect. This is done by maintaining two separate +// histogramPointCounters for each attribute set: one "hot" which is +// concurrently updated by measure(), and one "cold", which is read and reset +// by collect(). The [hotcoldWaitGroup] allows collect() to swap the hot and +// cold counters, and wait for updates to the cold counters to complete prior +// to reading. Unlike deltaHistogram, this maintains a single map so that the +// preserved attribute sets do not change when collect() is called. +type cumulativeHistogram[N int64 | float64] struct { + values limitedSyncMap + + start time.Time + noMinMax bool + noSum bool + bounds []float64 + newRes func(attribute.Set) FilteredExemplarReservoir[N] +} + +// newCumulativeHistogram returns a histogram that accumulates measurements +// into a histogram data structure. It is never reset. +func newCumulativeHistogram[N int64 | float64]( + boundaries []float64, + noMinMax, noSum bool, + limit int, + r func(attribute.Set) FilteredExemplarReservoir[N], +) *cumulativeHistogram[N] { + // The responsibility of keeping all histogramPoint correctly associated with the + // passed boundaries is ultimately this type's responsibility. Make a copy + // here so we can always guarantee this. Or, in the case of failure, have + // complete control over the fix. + b := slices.Clone(boundaries) + slices.Sort(b) + return &cumulativeHistogram[N]{ + start: now(), + noMinMax: noMinMax, + noSum: noSum, + bounds: b, + newRes: r, + values: limitedSyncMap{aggLimit: limit}, + } +} + +func (s *cumulativeHistogram[N]) measure( + ctx context.Context, + value N, + fltrAttr attribute.Set, + droppedAttr []attribute.KeyValue, +) { + h := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + hPt := &hotColdHistogramPoint[N]{ + res: s.newRes(attr), + attrs: attr, + // N+1 buckets. For example: + // + // bounds = [0, 5, 10] + // + // Then, + // + // count = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) + hotColdPoint: [2]histogramPointCounters[N]{ + { + counts: make([]atomic.Uint64, len(s.bounds)+1), + }, + { + counts: make([]atomic.Uint64, len(s.bounds)+1), + }, + }, + } + return hPt + }).(*hotColdHistogramPoint[N]) + + // This search will return an index in the range [0, len(s.bounds)], where + // it will return len(s.bounds) if value is greater than the last element + // of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint + // is len(s.bounds)+1, with the last bucket representing: + // (s.bounds[len(s.bounds)-1], +∞). + idx := sort.SearchFloat64s(s.bounds, float64(value)) + + hotIdx := h.hcwg.start() + defer h.hcwg.done(hotIdx) + + h.hotColdPoint[hotIdx].counts[idx].Add(1) + if !s.noMinMax { + h.hotColdPoint[hotIdx].minMax.Update(value) + } + if !s.noSum { + h.hotColdPoint[hotIdx].total.add(value) + } + h.res.Offer(ctx, value, droppedAttr) +} + +func (s *cumulativeHistogram[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() @@ -213,50 +332,58 @@ func (s *histogram[N]) cumulative( h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.CumulativeTemporality - s.valuesMu.Lock() - defer s.valuesMu.Unlock() - // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) - n := len(s.values) - hDPts := reset(h.DataPoints, n, n) + // Values are being concurrently written while we iterate, so only use the + // current length for capacity. + hDPts := reset(h.DataPoints, 0, s.values.Len()) var i int - for _, val := range s.values { - hDPts[i].Attributes = val.attrs - hDPts[i].StartTime = s.start - hDPts[i].Time = t - hDPts[i].Count = val.count - hDPts[i].Bounds = bounds - - // The HistogramDataPoint field values returned need to be copies of - // the buckets value as we will keep updating them. - // - // TODO (#3047): Making copies for bounds and counts incurs a large - // memory allocation footprint. Alternatives should be explored. - hDPts[i].BucketCounts = slices.Clone(val.counts) + s.values.Range(func(_, value any) bool { + val := value.(*hotColdHistogramPoint[N]) + // swap, observe, and clear the point + readIdx := val.hcwg.swapHotAndWait() + var bucketCounts []uint64 + count := val.hotColdPoint[readIdx].loadCountsInto(&bucketCounts) + newPt := metricdata.HistogramDataPoint[N]{ + Attributes: val.attrs, + StartTime: s.start, + Time: t, + Count: count, + Bounds: bounds, + // The HistogramDataPoint field values returned need to be copies of + // the histogramPoint value as we will keep updating them. + BucketCounts: bucketCounts, + } if !s.noSum { - hDPts[i].Sum = val.total + newPt.Sum = val.hotColdPoint[readIdx].total.load() } - if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + if val.hotColdPoint[readIdx].minMax.set.Load() { + newPt.Min = metricdata.NewExtrema(val.hotColdPoint[readIdx].minMax.minimum.Load()) + newPt.Max = metricdata.NewExtrema(val.hotColdPoint[readIdx].minMax.maximum.Load()) + } } + // Once we've read the point, merge it back into the hot histogram + // point since it is cumulative. + hotIdx := (readIdx + 1) % 2 + val.hotColdPoint[readIdx].mergeIntoAndReset(&val.hotColdPoint[hotIdx], s.noMinMax, s.noSum) - collectExemplars(&hDPts[i].Exemplars, val.res.Collect) + collectExemplars(&newPt.Exemplars, val.res.Collect) + hDPts = append(hDPts, newPt) i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. - } + return true + }) h.DataPoints = hDPts *dest = h - return n + return i } diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 6e0f3948de0..0bfa8e9970e 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -330,60 +330,12 @@ func hPoint[N int64 | float64]( } } -func TestBucketsBin(t *testing.T) { - t.Run("Int64", testBucketsBin[int64]()) - t.Run("Float64", testBucketsBin[float64]()) -} - -func testBucketsBin[N int64 | float64]() func(t *testing.T) { - return func(t *testing.T) { - b := newBuckets[N](alice, 3) - assertB := func(counts []uint64, count uint64, mi, ma N) { - t.Helper() - assert.Equal(t, counts, b.counts) - assert.Equal(t, count, b.count) - assert.Equal(t, mi, b.min) - assert.Equal(t, ma, b.max) - } - - assertB([]uint64{0, 0, 0}, 0, 0, 0) - b.bin(1) - b.minMax(2) - assertB([]uint64{0, 1, 0}, 1, 0, 2) - b.bin(0) - b.minMax(-1) - assertB([]uint64{1, 1, 0}, 2, -1, 2) - } -} - -func TestBucketsSum(t *testing.T) { - t.Run("Int64", testBucketsSum[int64]()) - t.Run("Float64", testBucketsSum[float64]()) -} - -func testBucketsSum[N int64 | float64]() func(t *testing.T) { - return func(t *testing.T) { - b := newBuckets[N](alice, 3) - - var want N - assert.Equal(t, want, b.total) - - b.sum(2) - want = 2 - assert.Equal(t, want, b.total) - - b.sum(-1) - want = 1 - assert.Equal(t, want, b.total) - } -} - func TestHistogramImmutableBounds(t *testing.T) { b := []float64{0, 1, 2} cpB := make([]float64, len(b)) copy(cpB, b) - h := newHistogram[int64](b, false, false, 0, dropExemplars[int64]) + h := newCumulativeHistogram[int64](b, false, false, 0, dropExemplars[int64]) require.Equal(t, cpB, h.bounds) b[0] = 10 @@ -392,29 +344,42 @@ func TestHistogramImmutableBounds(t *testing.T) { h.measure(t.Context(), 5, alice, nil) var data metricdata.Aggregation = metricdata.Histogram[int64]{} - h.cumulative(&data) + h.collect(&data) hdp := data.(metricdata.Histogram[int64]).DataPoints[0] hdp.Bounds[1] = 10 assert.Equal(t, cpB, h.bounds, "modifying the Aggregation bounds should not change the bounds") } func TestCumulativeHistogramImmutableCounts(t *testing.T) { - h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) + h := newCumulativeHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) h.measure(t.Context(), 5, alice, nil) var data metricdata.Aggregation = metricdata.Histogram[int64]{} - h.cumulative(&data) + h.collect(&data) hdp := data.(metricdata.Histogram[int64]).DataPoints[0] - require.Equal(t, hdp.BucketCounts, h.values[alice.Equivalent()].counts) + hPt, ok := h.values.Load(alice.Equivalent()) + require.True(t, ok) + hcHistPt := hPt.(*hotColdHistogramPoint[int64]) + readIdx := hcHistPt.hcwg.swapHotAndWait() + var bucketCounts []uint64 + hcHistPt.hotColdPoint[readIdx].loadCountsInto(&bucketCounts) + require.Equal(t, hdp.BucketCounts, bucketCounts) + hotIdx := (readIdx + 1) % 2 + hcHistPt.hotColdPoint[readIdx].mergeIntoAndReset(&hcHistPt.hotColdPoint[hotIdx], noMinMax, false) cpCounts := make([]uint64, len(hdp.BucketCounts)) copy(cpCounts, hdp.BucketCounts) hdp.BucketCounts[0] = 10 + hPt, ok = h.values.Load(alice.Equivalent()) + require.True(t, ok) + hcHistPt = hPt.(*hotColdHistogramPoint[int64]) + readIdx = hcHistPt.hcwg.swapHotAndWait() + hcHistPt.hotColdPoint[readIdx].loadCountsInto(&bucketCounts) assert.Equal( t, cpCounts, - h.values[alice.Equivalent()].counts, + bucketCounts, "modifying the Aggregator bucket counts should not change the Aggregator", ) } @@ -424,28 +389,28 @@ func TestDeltaHistogramReset(t *testing.T) { now = func() time.Time { return y2k } t.Cleanup(func() { now = orig }) - h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) + h := newDeltaHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) var data metricdata.Aggregation = metricdata.Histogram[int64]{} - require.Equal(t, 0, h.delta(&data)) + require.Equal(t, 0, h.collect(&data)) require.Empty(t, data.(metricdata.Histogram[int64]).DataPoints) h.measure(t.Context(), 1, alice, nil) expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality} expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1, now(), now())} - h.delta(&data) + h.collect(&data) metricdatatest.AssertAggregationsEqual(t, expect, data) // The attr set should be forgotten once Aggregations is called. expect.DataPoints = nil - assert.Equal(t, 0, h.delta(&data)) + assert.Equal(t, 0, h.collect(&data)) assert.Empty(t, data.(metricdata.Histogram[int64]).DataPoints) // Aggregating another set should not affect the original (alice). h.measure(t.Context(), 1, bob, nil) expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1, now(), now())} - h.delta(&data) + h.collect(&data) metricdatatest.AssertAggregationsEqual(t, expect, data) } diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 81690855114..66cb68085fd 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -17,12 +17,12 @@ type sumValue[N int64 | float64] struct { attrs attribute.Set } -type valueMap[N int64 | float64] struct { +type sumValueMap[N int64 | float64] struct { values limitedSyncMap newRes func(attribute.Set) FilteredExemplarReservoir[N] } -func (s *valueMap[N]) measure( +func (s *sumValueMap[N]) measure( ctx context.Context, value N, fltrAttr attribute.Set, @@ -52,7 +52,7 @@ func newDeltaSum[N int64 | float64]( return &deltaSum[N]{ monotonic: monotonic, start: now(), - hotColdValMap: [2]valueMap[N]{ + hotColdValMap: [2]sumValueMap[N]{ { values: limitedSyncMap{aggLimit: limit}, newRes: r, @@ -71,7 +71,7 @@ type deltaSum[N int64 | float64] struct { start time.Time hcwg hotColdWaitGroup - hotColdValMap [2]valueMap[N] + hotColdValMap [2]sumValueMap[N] } func (s *deltaSum[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { @@ -130,7 +130,7 @@ func newCumulativeSum[N int64 | float64]( return &cumulativeSum[N]{ monotonic: monotonic, start: now(), - valueMap: valueMap[N]{ + sumValueMap: sumValueMap[N]{ values: limitedSyncMap{aggLimit: limit}, newRes: r, }, @@ -142,7 +142,7 @@ type cumulativeSum[N int64 | float64] struct { monotonic bool start time.Time - valueMap[N] + sumValueMap[N] } func (s *cumulativeSum[N]) collect(