Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Include W3C TraceFlags (bits 0–7) in the OTLP `Span.Flags` field in `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracehttp` and `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracegrpc`. (#7438)
- The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types.
If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427)
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427, #7474, #7478)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
21 changes: 12 additions & 9 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {

// LastValue returns a last-value aggregate function input and output.
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
lv := newDeltaLastValue[N](b.AggregationLimit, b.resFunc())
return b.filter(lv.measure), lv.collect
default:
return b.filter(lv.measure), lv.cumulative
lv := newCumulativeLastValue[N](b.AggregationLimit, b.resFunc())
return b.filter(lv.measure), lv.collect
}
}

Expand Down Expand Up @@ -126,12 +127,13 @@ func (b Builder[N]) ExplicitBucketHistogram(
boundaries []float64,
noMinMax, noSum bool,
) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
h := newDeltaHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
return b.filter(h.measure), h.collect
default:
return b.filter(h.measure), h.cumulative
h := newCumulativeHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
return b.filter(h.measure), h.collect
}
}

Expand All @@ -141,12 +143,13 @@ func (b Builder[N]) ExponentialBucketHistogram(
maxSize, maxScale int32,
noMinMax, noSum bool,
) (Measure[N], ComputeAggregation) {
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
h := newDeltaExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
return b.filter(h.measure), h.collect
default:
return b.filter(h.measure), h.cumulative
h := newCumulativeExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
return b.filter(h.measure), h.collect
}
}

Expand Down
150 changes: 150 additions & 0 deletions sdk/metric/internal/aggregate/atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,97 @@
}
}

// reset resets the internal state, and is not safe to call concurrently.
func (n *atomicCounter[N]) reset() {
n.nFloatBits.Store(0)
n.nInt.Store(0)
}

// atomicN is a generic atomic number value.
type atomicN[N int64 | float64] struct {
val atomic.Uint64
}

func (a *atomicN[N]) Load() (value N) {
v := a.val.Load()
switch any(value).(type) {
case int64:
value = N(v)
case float64:
value = N(math.Float64frombits(v))
default:
panic("unsupported type")
}
return value
}

func (a *atomicN[N]) Store(v N) {
var val uint64
switch any(v).(type) {
case int64:
val = uint64(v)
case float64:
val = math.Float64bits(float64(v))
default:
panic("unsupported type")
}
a.val.Store(val)
}

func (a *atomicN[N]) CompareAndSwap(oldN, newN N) bool {
var o, n uint64
switch any(oldN).(type) {
case int64:
o, n = uint64(oldN), uint64(newN)
case float64:
o, n = math.Float64bits(float64(oldN)), math.Float64bits(float64(newN))
default:
panic("unsupported type")
}
return a.val.CompareAndSwap(o, n)
}

type atomicMinMax[N int64 | float64] struct {
minimum, maximum atomicN[N]
set atomic.Bool
mu sync.Mutex
}

// init returns true if the value was used to initialize min and max.
func (s *atomicMinMax[N]) init(val N) bool {
s.mu.Lock()
defer s.mu.Unlock()
if !s.set.Load() {
defer s.set.Store(true)
s.minimum.Store(val)
s.maximum.Store(val)
return true
}
return false
}

func (s *atomicMinMax[N]) Update(val N) {
if !s.set.Load() && s.init(val) {
return
}

old := s.minimum.Load()
for val < old {
if s.minimum.CompareAndSwap(old, val) {
return
}
old = s.minimum.Load()
}

old = s.maximum.Load()
for old < val {
if s.maximum.CompareAndSwap(old, val) {
return
}
old = s.maximum.Load()
}
}

// hotColdWaitGroup is a synchronization primitive which enables lockless
// writes for concurrent writers and enables a reader to acquire exclusive
// access to a snapshot of state including only completed operations.
Expand Down Expand Up @@ -102,6 +193,10 @@
return l.startedCountAndHotIdx.Add(1) >> 63
}

func (l *hotColdWaitGroup) loadHot() uint64 {
return l.startedCountAndHotIdx.Load() >> 63
}

// done signals to the reader that an operation has fully completed.
// done is safe to call concurrently.
func (l *hotColdWaitGroup) done(hotIdx uint64) {
Expand Down Expand Up @@ -182,3 +277,58 @@
defer m.lenMux.Unlock()
return m.len
}

// atomicLimitedRange is a range which can grow to at most maxSize. It is used
// to emulate a slice which is bounded in size, but can grow in either
// direction from any starting index.
type atomicLimitedRange struct {
startAndEnd atomic.Uint64
maxSize int32
}

func (r *atomicLimitedRange) Load() (start, end int32) {
n := r.startAndEnd.Load()
return int32(n >> 32), int32(n & ((1 << 32) - 1))
}

func (r *atomicLimitedRange) Store(start, end int32) {
// end must be cast to a uint32 first to avoid sign extension.
r.startAndEnd.Store(uint64(start)<<32 | uint64(uint32(end)))
}

func (r *atomicLimitedRange) Add(idx int32) bool {
for {
n := r.startAndEnd.Load()
start, end := int32(n>>32), int32(n&((1<<32)-1))
if idx >= start && idx < end {
// no expansion needed
return true
}

// If idx doesn't fit, still expand as far as possible in that
// direction to prevent the range from growing in the opposite
// direction. This ensures the following scale change is able to fit
// our point after the change.
partialExpansion := false
if start == end {

Check failure on line 313 in sdk/metric/internal/aggregate/atomic.go

View workflow job for this annotation

GitHub Actions / lint

ifElseChain: rewrite if-else to switch statement (gocritic)
start = idx
end = idx + 1
} else if idx < start {
start = idx
if end-start > r.maxSize {
start = end - r.maxSize
partialExpansion = true
}
} else if idx >= end {
end = idx + 1
if end-start > r.maxSize {
end = start + r.maxSize
partialExpansion = true
}
}
if !r.startAndEnd.CompareAndSwap(n, uint64(start)<<32|uint64(uint32(end))) {
continue
}
return !partialExpansion
}
}
32 changes: 32 additions & 0 deletions sdk/metric/internal/aggregate/atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,35 @@ func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
}
wg.Wait()
}

func TestAtomicLimitedRange(t *testing.T) {
a := &atomicLimitedRange{maxSize: 20}
start, end := a.Load()
assert.Equal(t, int32(0), start)
assert.Equal(t, int32(0), end)
a.Store(-20, -1)
start, end = a.Load()
assert.Equal(t, int32(-20), start)
assert.Equal(t, int32(-1), end)
a.Store(0, 0)
start, end = a.Load()
assert.Equal(t, int32(0), start)
assert.Equal(t, int32(0), end)
assert.True(t, a.Add(10))
start, end = a.Load()
assert.Equal(t, int32(10), start)
assert.Equal(t, int32(11), end)
assert.True(t, a.Add(20))
start, end = a.Load()
assert.Equal(t, int32(10), start)
assert.Equal(t, int32(21), end)
// Exceeds maxSize by 1.
assert.False(t, a.Add(0))
start, end = a.Load()
assert.Equal(t, int32(1), start)
assert.Equal(t, int32(21), end)
a.Store(-3, -2)
start, end = a.Load()
assert.Equal(t, int32(-3), start)
assert.Equal(t, int32(-2), end)
}
Loading
Loading